You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jx...@apache.org on 2013/11/12 01:00:25 UTC

svn commit: r1540887 - in /hbase/branches/0.96/hbase-server/src: main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java

Author: jxiang
Date: Tue Nov 12 00:00:24 2013
New Revision: 1540887

URL: http://svn.apache.org/r1540887
Log:
HBASE-9926 Scanner doesn't check if a region is available

Modified:
    hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java

Modified: hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1540887&r1=1540886&r2=1540887&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.96/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Nov 12 00:00:24 2013
@@ -2625,15 +2625,6 @@ public class HRegionServer implements Cl
   /*
    * @param t
    *
-   * @return Make <code>t</code> an IOE if it isn't already.
-   */
-  protected IOException convertThrowableToIOE(final Throwable t) {
-    return convertThrowableToIOE(t, null);
-  }
-
-  /*
-   * @param t
-   *
    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
    *
    * @return Make <code>t</code> an IOE if it isn't already.
@@ -2688,15 +2679,16 @@ public class HRegionServer implements Cl
     return this.fsOk;
   }
 
-  protected long addScanner(RegionScanner s) throws LeaseStillHeldException {
+  protected long addScanner(RegionScanner s, HRegion r) throws LeaseStillHeldException {
     long scannerId = -1;
     while (true) {
       scannerId = Math.abs(rand.nextLong() << 24) ^ startcode;
       String scannerName = String.valueOf(scannerId);
-      RegionScannerHolder existing = scanners.putIfAbsent(scannerName, new RegionScannerHolder(s));
+      RegionScannerHolder existing =
+        scanners.putIfAbsent(scannerName, new RegionScannerHolder(s, r));
       if (existing == null) {
         this.leases.createLease(scannerName, this.scannerLeaseTimeoutPeriod,
-            new ScannerListener(scannerName));
+          new ScannerListener(scannerName));
         break;
       }
     }
@@ -2949,191 +2941,192 @@ public class HRegionServer implements Cl
       }
       requestCount.increment();
 
-      try {
-        int ttl = 0;
-        HRegion region = null;
-        RegionScanner scanner = null;
-        RegionScannerHolder rsh = null;
-        boolean moreResults = true;
-        boolean closeScanner = false;
-        ScanResponse.Builder builder = ScanResponse.newBuilder();
-        if (request.hasCloseScanner()) {
-          closeScanner = request.getCloseScanner();
-        }
-        int rows = 1;
-        if (request.hasNumberOfRows()) {
-          rows = request.getNumberOfRows();
+      int ttl = 0;
+      HRegion region = null;
+      RegionScanner scanner = null;
+      RegionScannerHolder rsh = null;
+      boolean moreResults = true;
+      boolean closeScanner = false;
+      ScanResponse.Builder builder = ScanResponse.newBuilder();
+      if (request.hasCloseScanner()) {
+        closeScanner = request.getCloseScanner();
+      }
+      int rows = 1;
+      if (request.hasNumberOfRows()) {
+        rows = request.getNumberOfRows();
+      }
+      if (request.hasScannerId()) {
+        rsh = scanners.get(scannerName);
+        if (rsh == null) {
+          LOG.info("Client tried to access missing scanner " + scannerName);
+          throw new UnknownScannerException(
+            "Name: " + scannerName + ", already closed?");
+        }
+        scanner = rsh.s;
+        HRegionInfo hri = scanner.getRegionInfo();
+        region = getRegion(hri.getRegionName());
+        if (region != rsh.r) { // Yes, should be the same instance
+          throw new NotServingRegionException("Region was re-opened after the scanner"
+            + scannerName + " was created: " + hri.getRegionNameAsString());
         }
-        if (request.hasScannerId()) {
-          rsh = scanners.get(scannerName);
-          if (rsh == null) {
-            LOG.info("Client tried to access missing scanner " + scannerName);
-            throw new UnknownScannerException(
-              "Name: " + scannerName + ", already closed?");
-          }
-          scanner = rsh.s;
-          region = getRegion(scanner.getRegionInfo().getRegionName());
-        } else {
-          region = getRegion(request.getRegion());
-          ClientProtos.Scan protoScan = request.getScan();
-          boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
-          Scan scan = ProtobufUtil.toScan(protoScan);
-          // if the request doesn't set this, get the default region setting.
-          if (!isLoadingCfsOnDemandSet) {
-            scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
-          }
-          scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
-          region.prepareScanner(scan);
-          if (region.getCoprocessorHost() != null) {
-            scanner = region.getCoprocessorHost().preScannerOpen(scan);
-          }
-          if (scanner == null) {
-            scanner = region.getScanner(scan);
-          }
-          if (region.getCoprocessorHost() != null) {
-            scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
-          }
-          scannerId = addScanner(scanner);
-          scannerName = String.valueOf(scannerId);
-          ttl = this.scannerLeaseTimeoutPeriod;
+      } else {
+        region = getRegion(request.getRegion());
+        ClientProtos.Scan protoScan = request.getScan();
+        boolean isLoadingCfsOnDemandSet = protoScan.hasLoadColumnFamiliesOnDemand();
+        Scan scan = ProtobufUtil.toScan(protoScan);
+        // if the request doesn't set this, get the default region setting.
+        if (!isLoadingCfsOnDemandSet) {
+          scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());
+        }
+        scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);
+        region.prepareScanner(scan);
+        if (region.getCoprocessorHost() != null) {
+          scanner = region.getCoprocessorHost().preScannerOpen(scan);
+        }
+        if (scanner == null) {
+          scanner = region.getScanner(scan);
         }
+        if (region.getCoprocessorHost() != null) {
+          scanner = region.getCoprocessorHost().postScannerOpen(scan, scanner);
+        }
+        scannerId = addScanner(scanner, region);
+        scannerName = String.valueOf(scannerId);
+        ttl = this.scannerLeaseTimeoutPeriod;
+      }
 
-        if (rows > 0) {
-          // if nextCallSeq does not match throw Exception straight away. This needs to be
-          // performed even before checking of Lease.
-          // See HBASE-5974
-          if (request.hasNextCallSeq()) {
-            if (rsh == null) {
-              rsh = scanners.get(scannerName);
-            }
-            if (rsh != null) {
-              if (request.getNextCallSeq() != rsh.nextCallSeq) {
-                throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
-                  + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
-                  "; request=" + TextFormat.shortDebugString(request));
-              }
-              // Increment the nextCallSeq value which is the next expected from client.
-              rsh.nextCallSeq++;
+      if (rows > 0) {
+        // if nextCallSeq does not match throw Exception straight away. This needs to be
+        // performed even before checking of Lease.
+        // See HBASE-5974
+        if (request.hasNextCallSeq()) {
+          if (rsh == null) {
+            rsh = scanners.get(scannerName);
+          }
+          if (rsh != null) {
+            if (request.getNextCallSeq() != rsh.nextCallSeq) {
+              throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq
+                + " But the nextCallSeq got from client: " + request.getNextCallSeq() +
+                "; request=" + TextFormat.shortDebugString(request));
             }
+            // Increment the nextCallSeq value which is the next expected from client.
+            rsh.nextCallSeq++;
           }
-          try {
-            // Remove lease while its being processed in server; protects against case
-            // where processing of request takes > lease expiration time.
-            lease = leases.removeLease(scannerName);
-            List<Result> results = new ArrayList<Result>(rows);
-            long currentScanResultSize = 0;
+        }
+        try {
+          // Remove lease while its being processed in server; protects against case
+          // where processing of request takes > lease expiration time.
+          lease = leases.removeLease(scannerName);
+          List<Result> results = new ArrayList<Result>(rows);
+          long currentScanResultSize = 0;
 
-            boolean done = false;
-            // Call coprocessor. Get region info from scanner.
-            if (region != null && region.getCoprocessorHost() != null) {
-              Boolean bypass = region.getCoprocessorHost().preScannerNext(
-                scanner, results, rows);
-              if (!results.isEmpty()) {
-                for (Result r : results) {
-                  if (maxScannerResultSize < Long.MAX_VALUE){
-                    for (Cell kv : r.rawCells()) {
-                      // TODO
-                      currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
-                    }
+          boolean done = false;
+          // Call coprocessor. Get region info from scanner.
+          if (region != null && region.getCoprocessorHost() != null) {
+            Boolean bypass = region.getCoprocessorHost().preScannerNext(
+              scanner, results, rows);
+            if (!results.isEmpty()) {
+              for (Result r : results) {
+                if (maxScannerResultSize < Long.MAX_VALUE){
+                  for (Cell kv : r.rawCells()) {
+                    // TODO
+                    currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
                   }
                 }
               }
-              if (bypass != null && bypass.booleanValue()) {
-                done = true;
-              }
             }
+            if (bypass != null && bypass.booleanValue()) {
+              done = true;
+            }
+          }
 
-            if (!done) {
-              long maxResultSize = scanner.getMaxResultSize();
-              if (maxResultSize <= 0) {
-                maxResultSize = maxScannerResultSize;
-              }
-              List<Cell> values = new ArrayList<Cell>();
-              MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
-              region.startRegionOperation(Operation.SCAN);
-              try {
-                int i = 0;
-                synchronized(scanner) {
-                  for (; i < rows
-                      && currentScanResultSize < maxResultSize; i++) {
-                    // Collect values to be returned here
-                    boolean moreRows = scanner.nextRaw(values);
-                    if (!values.isEmpty()) {
-                      if (maxScannerResultSize < Long.MAX_VALUE){
-                        for (Cell kv : values) {
-                          currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
-                        }
+          if (!done) {
+            long maxResultSize = scanner.getMaxResultSize();
+            if (maxResultSize <= 0) {
+              maxResultSize = maxScannerResultSize;
+            }
+            List<Cell> values = new ArrayList<Cell>();
+            MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());
+            region.startRegionOperation(Operation.SCAN);
+            try {
+              int i = 0;
+              synchronized(scanner) {
+                for (; i < rows
+                    && currentScanResultSize < maxResultSize; i++) {
+                  // Collect values to be returned here
+                  boolean moreRows = scanner.nextRaw(values);
+                  if (!values.isEmpty()) {
+                    if (maxScannerResultSize < Long.MAX_VALUE){
+                      for (Cell kv : values) {
+                        currentScanResultSize += KeyValueUtil.ensureKeyValue(kv).heapSize();
                       }
-                      results.add(Result.create(values));
                     }
-                    if (!moreRows) {
-                      break;
-                    }
-                    values.clear();
+                    results.add(Result.create(values));
+                  }
+                  if (!moreRows) {
+                    break;
                   }
+                  values.clear();
                 }
-                region.readRequestsCount.add(i);
-              } finally {
-                region.closeRegionOperation();
-              }
-
-              // coprocessor postNext hook
-              if (region != null && region.getCoprocessorHost() != null) {
-                region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
               }
+              region.readRequestsCount.add(i);
+            } finally {
+              region.closeRegionOperation();
             }
 
-            // If the scanner's filter - if any - is done with the scan
-            // and wants to tell the client to stop the scan. This is done by passing
-            // a null result, and setting moreResults to false.
-            if (scanner.isFilterDone() && results.isEmpty()) {
-              moreResults = false;
-              results = null;
-            } else {
-              addResults(builder, results, controller);
-            }
-          } finally {
-            // We're done. On way out re-add the above removed lease.
-            // Adding resets expiration time on lease.
-            if (scanners.containsKey(scannerName)) {
-              if (lease != null) leases.addLease(lease);
-              ttl = this.scannerLeaseTimeoutPeriod;
+            // coprocessor postNext hook
+            if (region != null && region.getCoprocessorHost() != null) {
+              region.getCoprocessorHost().postScannerNext(scanner, results, rows, true);
             }
           }
-        }
 
-        if (!moreResults || closeScanner) {
-          ttl = 0;
-          moreResults = false;
-          if (region != null && region.getCoprocessorHost() != null) {
-            if (region.getCoprocessorHost().preScannerClose(scanner)) {
-              return builder.build(); // bypass
-            }
+          // If the scanner's filter - if any - is done with the scan
+          // and wants to tell the client to stop the scan. This is done by passing
+          // a null result, and setting moreResults to false.
+          if (scanner.isFilterDone() && results.isEmpty()) {
+            moreResults = false;
+            results = null;
+          } else {
+            addResults(builder, results, controller);
           }
-          rsh = scanners.remove(scannerName);
-          if (rsh != null) {
-            scanner = rsh.s;
-            scanner.close();
-            leases.cancelLease(scannerName);
-            if (region != null && region.getCoprocessorHost() != null) {
-              region.getCoprocessorHost().postScannerClose(scanner);
-            }
+        } finally {
+          // We're done. On way out re-add the above removed lease.
+          // Adding resets expiration time on lease.
+          if (scanners.containsKey(scannerName)) {
+            if (lease != null) leases.addLease(lease);
+            ttl = this.scannerLeaseTimeoutPeriod;
           }
         }
+      }
 
-        if (ttl > 0) {
-          builder.setTtl(ttl);
+      if (!moreResults || closeScanner) {
+        ttl = 0;
+        moreResults = false;
+        if (region != null && region.getCoprocessorHost() != null) {
+          if (region.getCoprocessorHost().preScannerClose(scanner)) {
+            return builder.build(); // bypass
+          }
         }
-        builder.setScannerId(scannerId);
-        builder.setMoreResults(moreResults);
-        return builder.build();
-      } catch (Throwable t) {
-        if (scannerName != null && t instanceof NotServingRegionException) {
-          scanners.remove(scannerName);
+        rsh = scanners.remove(scannerName);
+        if (rsh != null) {
+          scanner = rsh.s;
+          scanner.close();
+          leases.cancelLease(scannerName);
+          if (region != null && region.getCoprocessorHost() != null) {
+            region.getCoprocessorHost().postScannerClose(scanner);
+          }
         }
-        throw convertThrowableToIOE(cleanup(t));
       }
+
+      if (ttl > 0) {
+        builder.setTtl(ttl);
+      }
+      builder.setScannerId(scannerId);
+      builder.setMoreResults(moreResults);
+      return builder.build();
     } catch (IOException ie) {
+      if (scannerName != null && ie instanceof NotServingRegionException) {
+        scanners.remove(scannerName);
+      }
       throw new ServiceException(ie);
     }
   }
@@ -4300,9 +4293,11 @@ public class HRegionServer implements Cl
   private static class RegionScannerHolder {
     private RegionScanner s;
     private long nextCallSeq = 0L;
+    private HRegion r;
 
-    public RegionScannerHolder(RegionScanner s) {
+    public RegionScannerHolder(RegionScanner s, HRegion r) {
       this.s = s;
+      this.r = r;
     }
   }
 

Modified: hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java?rev=1540887&r1=1540886&r2=1540887&view=diff
==============================================================================
--- hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java (original)
+++ hbase/branches/0.96/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java Tue Nov 12 00:00:24 2013
@@ -26,12 +26,23 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HTestConst;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
 import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
+import org.apache.hadoop.hbase.master.HMaster;
+import org.apache.hadoop.hbase.master.RegionState.State;
+import org.apache.hadoop.hbase.master.RegionStates;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -424,6 +435,85 @@ public class TestScannersFromClientSide 
        "Testing offset + multiple CFs + maxResults");
   }
 
+  /**
+   * Test from client side for scan while the region is reopened
+   * on the same region server.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testScanOnReopenedRegion() throws Exception {
+    byte [] TABLE = Bytes.toBytes("testScanOnReopenedRegion");
+    byte [][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, 2);
+
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILY);
+
+    Put put;
+    Scan scan;
+    Result result;
+    ResultScanner scanner;
+    boolean toLog = false;
+    List<Cell> kvListExp;
+
+    // table: row, family, c0:0, c1:1
+    put = new Put(ROW);
+    for (int i=0; i < QUALIFIERS.length; i++) {
+      KeyValue kv = new KeyValue(ROW, FAMILY, QUALIFIERS[i], i, VALUE);
+      put.add(kv);
+    }
+    ht.put(put);
+
+    scan = new Scan(ROW);
+    scanner = ht.getScanner(scan);
+
+    HRegionLocation loc = ht.getRegionLocation(ROW);
+    HRegionInfo hri = loc.getRegionInfo();
+    MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
+    byte[] regionName = hri.getRegionName();
+    int i = cluster.getServerWith(regionName);
+    HRegionServer rs = cluster.getRegionServer(i);
+    ProtobufUtil.closeRegion(rs, regionName, false);
+    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+    long timeOut = 300000;
+    while (true) {
+      if (rs.getOnlineRegion(regionName) == null) {
+        break;
+      }
+      assertTrue("Timed out in closing the testing region",
+        EnvironmentEdgeManager.currentTimeMillis() < startTime + timeOut);
+      Thread.sleep(500);
+    }
+
+    // Now open the region again.
+    ZooKeeperWatcher zkw = TEST_UTIL.getZooKeeperWatcher();
+    try {
+      HMaster master = cluster.getMaster();
+      RegionStates states = master.getAssignmentManager().getRegionStates();
+      states.regionOffline(hri);
+      states.updateRegionState(hri, State.OPENING);
+      ZKAssign.createNodeOffline(zkw, hri, loc.getServerName());
+      ProtobufUtil.openRegion(rs, hri);
+      startTime = EnvironmentEdgeManager.currentTimeMillis();
+      while (true) {
+        if (rs.getOnlineRegion(regionName) != null) {
+          break;
+        }
+        assertTrue("Timed out in open the testing region",
+          EnvironmentEdgeManager.currentTimeMillis() < startTime + timeOut);
+        Thread.sleep(500);
+      }
+    } finally {
+      ZKAssign.deleteNodeFailSilent(zkw, hri);
+    }
+
+    // c0:0, c1:1
+    kvListExp = new ArrayList<Cell>();
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[0], 0, VALUE));
+    kvListExp.add(new KeyValue(ROW, FAMILY, QUALIFIERS[1], 1, VALUE));
+    result = scanner.next();
+    verifyResult(result, kvListExp, toLog, "Testing scan on re-opened region");
+  }
+
   static void verifyResult(Result result, List<Cell> expKvList, boolean toLog,
       String msg) {