You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/11/06 20:18:23 UTC

svn commit: r1539433 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/client/

Author: liyin
Date: Wed Nov  6 19:18:23 2013
New Revision: 1539433

URL: http://svn.apache.org/r1539433
Log:
[master] Solve skipping data in HTable scans

Author: maxim

Summary:
The HTable client cannot retry a scan operation in the getRegionServerWithRetries code path.
This will result in the client missing data. This can be worked around using hbase.client.retries.number to 1.

The whole problem is that Callable knows nothing about retries and the protocol it dances to as well doesn't support retires.
This fix will keep Callable protocol (ugly thing worth merciless refactoring) intact but will change
ScannerCallable to anticipate retries.  What we want is to make failed operations to be identities for outside world:
 N1 * N2 * F3 * N3 * F4 * F4 * N4 ... = N1 * N2 * N3 * N4 ...
where Nk are successful operation and Fk are failed operations.

Test Plan: 1/ Enusure that new tests in TestScanRetries are working

Reviewers: manukranthk, liyintang, aaiyer

Reviewed By: manukranthk

CC: sfodor, aaiyer, jingweil, junfang, liyintang, peiyue, rishid, rshroff, san

Differential Revision: https://phabricator.fb.com/D1027350

Task ID: 2147851

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=1539433&r1=1539432&r2=1539433&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java Wed Nov  6 19:18:23 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.mortbay.log.Log;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 
 /**
@@ -40,6 +41,9 @@ public class ScannerCallable extends Ser
   private boolean closed = false;
   private Scan scan;
   private int caching = 1;
+  private boolean skipFirstRow = false;
+  private boolean forceReopen = false;
+  private byte[] lastRowSeen = null;
 
   /**
    * @param connection which connection
@@ -58,33 +62,78 @@ public class ScannerCallable extends Ser
   public Result [] call() throws IOException {
     if (scannerId != -1L && closed) {
       close();
-    } else if (scannerId == -1L && !closed) {
+    } else if (scannerId == -1L && !closed && !forceReopen) {
       this.scannerId = openScanner();
     } else {
+      ensureScannerIsOpened();
       Result [] rrs = null;
       try {
-        rrs = server.next(scannerId, caching);
+        rrs = next();
       } catch (IOException e) {
-        IOException ioe = null;
-        if (e instanceof RemoteException) {
-          ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
-        }
-        if (ioe == null) throw new IOException(e);
-        if (ioe instanceof NotServingRegionException) {
-          // Throw a DNRE so that we break out of cycle of calling NSRE
-          // when what we need is to open scanner against new location.
-          // Attach NSRE to signal client that it needs to resetup scanner.
-          throw new DoNotRetryIOException("Reset scanner", ioe);
-        } else {
-          // The outer layers will retry
-          throw ioe;
-        }
+        fixStateOrCancelRetryThanRethrow(e);
       }
       return rrs;
     }
     return null;
   }
 
+  private void fixStateOrCancelRetryThanRethrow(IOException e) throws IOException {
+    IOException ioe = null;
+    if (e instanceof RemoteException) {
+      ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+    }
+    if (ioe == null) {
+      ioe = new IOException(e);
+    }
+    if (ioe instanceof NotServingRegionException) {
+      // Throw a DNRE so that we break out of cycle of calling NSRE
+      // when what we need is to open scanner against new location.
+      // Attach NSRE to signal client that it needs to resetup scanner.
+      throw new DoNotRetryIOException("Reset scanner", ioe);
+    } else {
+      // The outer layers will retry
+      fixStateForFutureRetries();
+      throw ioe;
+    }
+  }
+
+  private Result[] next() throws IOException {
+    Result[] results = server.next(scannerId, caching);
+    if (results != null && results.length > 0) {
+      byte[] lastRow = results[results.length - 1].getRow();
+      if (lastRow.length > 0) {
+        lastRowSeen = lastRow;
+      }
+      if (skipFirstRow) {
+        skipFirstRow = false;
+        // We can't return empty results as it will prematurely signal end of region
+        if (results.length > 1) {
+          results = Arrays.copyOfRange(results, 1, results.length);
+        } else {
+          return next();
+        }
+      }
+    }
+    return results;
+  }
+
+  private void fixStateForFutureRetries() {
+    Log.info("Fixing scan state for future retries");
+    close();
+    if (lastRowSeen != null && lastRowSeen.length > 0) {
+      scan.setStartRow(lastRowSeen);
+      skipFirstRow = true;
+    }
+    forceReopen = true;
+  }
+
+  private void ensureScannerIsOpened() throws IOException {
+    if (forceReopen) {
+      scannerId = openScanner();
+      forceReopen = false;
+    }
+  }
+
   private void close() {
     if (this.scannerId == -1L) {
       return;

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1539433&r1=1539432&r2=1539433&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Wed Nov  6 19:18:23 2013
@@ -363,6 +363,11 @@ public class HBaseTestingUtility {
     return startMiniCluster(1, numSlaves);
   }
 
+  public MiniHBaseCluster startMiniCluster(final int numMasters,
+      final int numSlaves) throws IOException, InterruptedException {
+    return startMiniCluster(numMasters, numSlaves, MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
+  }
+
   /**
    * Start up a minicluster of hbase, optionally dfs, and zookeeper.
    * Modifies Configuration.  Homes the cluster data directory under a random
@@ -380,7 +385,7 @@ public class HBaseTestingUtility {
    * @return Mini hbase cluster instance created.
    */
   public MiniHBaseCluster startMiniCluster(final int numMasters,
-      final int numSlaves) throws IOException, InterruptedException {
+      final int numSlaves, final Class<? extends HRegionServer> regionServerClass) throws IOException, InterruptedException {
     LOG.info("Starting up minicluster");
     // If we already put up a cluster, fail.
     if (miniClusterRunning) {
@@ -413,7 +418,7 @@ public class HBaseTestingUtility {
     this.conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
     fs.mkdirs(hbaseRootdir);
     FSUtils.setVersion(fs, hbaseRootdir);
-    startMiniHBaseCluster(numMasters, numSlaves);
+    startMiniHBaseCluster(numMasters, numSlaves, regionServerClass);
 
     // Don't leave here till we've done a successful scan of the .META.
     HTable t = null;
@@ -439,9 +444,16 @@ public class HBaseTestingUtility {
     return this.hbaseCluster;
   }
 
+
   public void startMiniHBaseCluster(final int numMasters, final int numSlaves)
-      throws IOException, InterruptedException {
-    this.hbaseCluster = new MiniHBaseCluster(this.conf, numMasters, numSlaves);
+    throws IOException, InterruptedException {
+    startMiniHBaseCluster(numMasters, numSlaves, MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
+  }
+
+  public void startMiniHBaseCluster(final int numMasters, final int numSlaves,
+    Class<? extends HRegionServer> regionServerClass)
+  throws IOException, InterruptedException {
+    this.hbaseCluster = new MiniHBaseCluster(this.conf, numMasters, numSlaves, regionServerClass);
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=1539433&r1=1539432&r2=1539433&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/MiniHBaseCluster.java Wed Nov  6 19:18:23 2013
@@ -30,18 +30,18 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.hbase.util.HasThread;
 
 /**
  * This class creates a single process HBase cluster.
@@ -87,6 +87,11 @@ public class MiniHBaseCluster {
   public MiniHBaseCluster(Configuration conf, int numMasters,
       int numRegionServers)
   throws IOException, InterruptedException {
+    this(conf, numMasters, numRegionServers, MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
+  }
+
+  public MiniHBaseCluster(Configuration conf, int numMasters, int numRegionServers, Class regionServerClass)
+    throws IOException, InterruptedException {
     this.conf = conf;
     MiniHBaseCluster.numClusters ++;
     conf.set(HConstants.MASTER_PORT, "0");
@@ -94,7 +99,7 @@ public class MiniHBaseCluster {
         PREFERRED_ASSIGNMENT);
     conf.setLong("hbase.master.holdRegionForBestLocality.period",
         PREFERRED_ASSIGNMENT / 5);
-    init(numMasters, numRegionServers);
+    init(numMasters, numRegionServers, regionServerClass);
   }
 
   /**
@@ -252,13 +257,13 @@ public class MiniHBaseCluster {
     }
   }
 
-  private void init(final int nMasterNodes, final int nRegionNodes)
+  private void init(final int nMasterNodes, final int nRegionNodes, Class regionServerClass)
   throws IOException {
     try {
       // start up a LocalHBaseCluster
       hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, nRegionNodes,
           MiniHBaseCluster.MiniHBaseClusterMaster.class,
-          MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
+          regionServerClass);
       hbaseCluster.startup();
     } catch(IOException e) {
       shutdown();

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java?rev=1539433&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestScanRetries.java Wed Nov  6 19:18:23 2013
@@ -0,0 +1,149 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ParamFormat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Test various scanner timeout issues.
+ */
+public class TestScanRetries {
+
+  private final static HBaseTestingUtility
+          TEST_UTIL = new HBaseTestingUtility();
+
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static byte[] SOME_BYTES = Bytes.toBytes("f");
+  private final static byte[] TABLE_NAME = Bytes.toBytes("t");
+  private final static int NB_ROWS = 6;
+  private final static int SCANNER_TIMEOUT = 100000;
+  private static HTable table;
+  private static boolean enableFailure = false;
+
+  public static class TestRegionServer extends MiniHBaseCluster.MiniHBaseClusterRegionServer {
+    private long nextCallCount = 0l;
+
+    public TestRegionServer(Configuration conf)
+        throws IOException {
+      super(conf);
+    }
+
+    @ParamFormat(clazz = ScanParamsFormatter.class)
+    @Override
+    public Result[] next(final long scannerId, int nbRows) throws IOException {
+      ++nextCallCount;
+      LOG.info("nextCallCount: " + String.valueOf(nextCallCount));
+      if (enableFailure && nextCallCount % 5 == 0) {
+        super.next(scannerId, nbRows);
+        LOG.info("Something bad happened on the way from server to client. Should force retry!");
+        throw new IOException("Forcing retry");
+      }
+      return super.next(scannerId, nbRows);
+    }
+  }
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+      Configuration conf = TEST_UTIL.getConfiguration();
+      conf.setInt("hbase.regionserver.lease.period", SCANNER_TIMEOUT);
+      conf.setInt("hbase.client.retries.number", 5);
+      TEST_UTIL.startMiniCluster(1, 2, TestRegionServer.class);
+      table = TEST_UTIL.createTable(Bytes.toBytes("t"), SOME_BYTES);
+      for (int i = 0; i < NB_ROWS; i++) {
+          Put put = new Put(Bytes.toBytes(i));
+          put.add(SOME_BYTES, SOME_BYTES, SOME_BYTES);
+          table.put(put);
+      }
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    TEST_UTIL.ensureSomeRegionServersAvailable(2);
+  }
+
+  /**
+   * Tests that we have right number of rows in scan
+   * @throws Exception
+   */
+  @Test
+  public void testNumberOfRowsInScanWithoutRetries() throws Exception {
+    enableFailure = false;
+    doTestNumberOfRowsInScan();
+  }
+
+  /**
+   * Tests that we have right number of rows in scan even with retries
+   * @throws Exception
+   */
+  @Test
+  public void testNumberOfRowsInScanWithRetries() throws Exception {
+    enableFailure = true;
+    doTestNumberOfRowsInScan();
+  }
+
+  public void doTestNumberOfRowsInScan() throws Exception {
+    Scan scan = new Scan();
+    ResultScanner r = table.getScanner(scan);
+    int count = 0;
+    try {
+      Result res = r.next();
+      while (res != null) {
+        count++;
+        res = r.next();
+      }
+    } catch (Throwable e) {
+      LOG.error("Got exception " + e.getMessage(), e);
+      fail("Exception while counting rows!");
+    }
+    r.close();
+    LOG.info("Number of rows read: " + String.valueOf(count));
+    assertEquals(NB_ROWS, count);
+  }
+}
+