You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/08/24 20:18:20 UTC

svn commit: r1517172 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ test/java/org/apache/hadoop/hbase/client/

Author: liyin
Date: Sat Aug 24 18:18:20 2013
New Revision: 1517172

URL: http://svn.apache.org/r1517172
Log:
[HBASE-9325] Add functionality to flush and wait and use it in ClientLocalScanner

Author: manukranthk

Summary: Add a functionality in HConnectionManager to perform flushAndWait for flush.

Test Plan: Unit Test

Reviewers: adela, aaiyer, gauravm

Reviewed By: aaiyer

CC: hbase-eng@, liyintang

Differential Revision: https://phabricator.fb.com/D898956

Task ID: 2649976

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat Aug 24 18:18:20 2013
@@ -829,6 +829,41 @@ public final class HConstants {
   public static final String USE_MSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
   public static final boolean USE_MSLAB_DEFAULT = false;
 
+  /**
+   * This wait time is used to periodically probe until
+   * we exhaust the timeout in the window
+   */
+  public static final String WAIT_TIME_FOR_FLUSH_MS =
+      "hbase.hregion.flush.waittime";
+  public static final long DEFAULT_WAIT_TIME_FOR_FLUSH_MS = 100; //ms
+
+  /**
+   * The knob to turn on the ClientLocalScanner to flush and wait for the
+   * region flush to finish before it opens the store files.
+   * Set the socket timeout for the RPC appropriately for this.
+   */
+  public static final String CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT =
+      "hbase.clientlocalscanner.flush.and.wait";
+  public static final boolean DEFAULT_CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT =
+      false;
+
+  /**
+   * The acceptable staleness of a flush. Say if this value is set to 10s,
+   * if there was a flush in the last 10s, we would not flush again.
+   */
+  public static final String CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS =
+      "hbase.clientlocalscanner.flush.acceptable.staleness";
+  public static final long DEFAULT_CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS =
+      30000; // ms
+
+  /**
+   * The extra wait time that we wait for the flush to take place.
+   */
+  public static final String CLIENT_LOCAL_SCANNER_MAX_WAITTIME_FOR_FLUSH_MS =
+      "hbase.clientlocal.scanner.flush.maxwaittime";
+  public static final int DEFAULT_CLIENT_LOCAL_SCANNER_MAX_WAITTIME_FOR_FLUSH_MS
+    = 10000; // ms
+
   private HConstants() {
     // Can't be instantiated with this constructor.
   }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java Sat Aug 24 18:18:20 2013
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionUtilities;
@@ -90,15 +92,36 @@ public class ClientLocalScanner extends 
     this.areHardlinksCreated = areHardlinksCreated;
   }
 
+  private void flushRegionAndWaitForFlush(HRegionInfo info, HServerAddress addr)
+      throws IOException
+  {
+    HTable table = this.htable;
+    try {
+      long window = table.getConfiguration().getLong(
+          HConstants.CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS,
+          HConstants.DEFAULT_CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS);
+      table.flushRegionForRow(info.getStartKey(), window);
+    } catch (Exception e) {
+      throw new IOException(e);
+    }
+  }
+
   /**
    * Creates and initializes the stores in the table.
    * @throws IOException
    */
-  public void openStoresOnClient() throws IOException{
+  public void openStoresOnClient() throws IOException {
     final HRegionInfo info = this.currentRegion;
     this.currentRegion.getTableDesc().setReadOnly(true);
     HTable table = this.htable;
     final Configuration conf = table.getConfiguration();
+    boolean flushAndWait = conf.getBoolean(
+        HConstants.CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT,
+        HConstants.DEFAULT_CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT);
+    if (flushAndWait) {
+      flushRegionAndWaitForFlush(this.currentRegion,
+        table.getRegionsInfo().get(info));
+    }
     Path rootDir = FSUtils.getRootDir(conf);
     final Path tableDir =
       HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName());
@@ -139,8 +162,10 @@ public class ClientLocalScanner extends 
    * @return true is the scanners were opened properly on the new HRegion.
    * false otherwise
    * @throws IOException
+   * @throws Exception
    */
-  protected boolean doRealOpenScanners(byte[] localStartKey) throws IOException {
+  protected boolean doRealOpenScanners(byte[] localStartKey)
+      throws IOException {
     try {
       this.currentRegion =
           htable.getRegionLocation(localStartKey).getRegionInfo();
@@ -194,6 +219,7 @@ public class ClientLocalScanner extends 
    * Reads the current region server for the next caching number of rows.
    * It might progress the region server in case there were no values found
    * in the current region server.
+   * @throws IOException
    */
   public void cacheNextResults() throws IOException {
     Result [] values = null;

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Sat Aug 24 18:18:20 2013
@@ -391,4 +391,25 @@ public interface HConnection extends Clo
    */
   public void resetOperationContext();
 
+  /**
+   * Returns if the most recent flush on this region happens in the window
+   * [current server time - acceptableWindowForLastFlush,
+   *                              current server time + maximumWaitTime).
+   * Else it will force flush
+   * i.e. the call will return immediately if the last flush happened
+   * in the last acceptableWindowForLastFlush ms or
+   * it will wait until maximumWaitTime ms for a flush to happen.
+   * If there was no flush of that sort, it will force a flush.
+   * @param regionInfo : The HRegionInfo that needs to be flushed.
+   * @param addr : The HServerAddress of the region server holding the region
+   * @param acceptableWindowForLastFlush : The acceptable window for the
+   * last flush. i.e. if there was a flush between current time and
+   * current time - acceptableWindowForLastFlush,
+   * we consider that flush to be good enough.
+   * @param maximumWaitTime : The maximum amount of time we wait for the
+   * @throws IOException
+   */
+  public void flushRegionAndWait(final HRegionInfo regionInfo,
+      final HServerAddress addr, long acceptableWindowForLastFlush,
+      long maximumWaitTime) throws IOException;
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Aug 24 18:18:20 2013
@@ -2991,6 +2991,49 @@ public class HConnectionManager {
       }
     }
 
+    public void flushRegionAndWait(final HRegionInfo regionInfo,
+        final HServerAddress addr, long acceptableWindowForLastFlush,
+        long maximumWaitTime) throws IOException {
+      HBaseRPCOptions options = new HBaseRPCOptions();
+      try {
+        long serverTime =
+            (long)this.createCurrentTimeCallable(addr, options).call();
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        long leastAcceptableFlushTime =
+            serverTime - acceptableWindowForLastFlush;
+        boolean forceFlush = false;
+        boolean waitForFlushToHappen = true;
+        do {
+          long lastFlushTime = this.getRegionServerWithoutRetries(
+            new ServerCallableForBatchOps<Long>(this, addr, options) {
+              public Long call() {
+                return server.getLastFlushTime(regionInfo.getRegionName());
+              }
+          }, true);
+          if (lastFlushTime >= leastAcceptableFlushTime) break; // flush happened
+          long elapsedTime =
+              EnvironmentEdgeManager.currentTimeMillis() - startTime;
+          if (elapsedTime >= maximumWaitTime) {
+            forceFlush = true;
+            break;
+          }
+          // Sleep and hope for flush to happen
+          Threads.sleep(
+            this.conf.getLong(HConstants.WAIT_TIME_FOR_FLUSH_MS,
+            HConstants.DEFAULT_WAIT_TIME_FOR_FLUSH_MS));
+        } while (waitForFlushToHappen);
+        if (forceFlush) {
+          Callable<Void> flushCallable =
+              this.createFlushCallable(addr, regionInfo, serverTime, options);
+          flushCallable.call();
+        }
+      } catch (Exception e) {
+        // Throwing IOException since the error is recoverable
+        // and the function can be safely retried.
+        throw new IOException(e);
+      }
+    }
+
     private void trackMutationsToTable(byte[] tableNameBytes,
         HRegionLocation location) throws IOException {
       String tableName = Bytes.toString(tableNameBytes);

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Sat Aug 24 18:18:20 2013
@@ -602,7 +602,7 @@ public class HTable implements HTableInt
    * Otherwise, we assume that the table hierarchy under the root directory
    * is not going to change and hence we read directly from it; use with caution
    * @return
-   * @throws IOException
+   * @throws IOException : retry on IOException
    */
   public ResultScanner getLocalScanner(final Scan scan,
       boolean createNewHardlinks) throws IOException {
@@ -990,6 +990,36 @@ public class HTable implements HTableInt
     return writeBuffer;
   }
 
+
+  /**
+   * Flushes a region for the give row.
+   * @param row
+   * @param acceptableLastFlushTimeMs : The acceptable last flush time in ms
+   * @throws IOException : The client can safely retry on IOException
+   */
+  public void flushRegionForRow(byte[] row, long acceptableLastFlushTimeMs)
+      throws IOException {
+    flushRegionLazyForRow(row, acceptableLastFlushTimeMs, 0);
+  }
+
+  /**
+   * Flushes a region for the given row.
+   * @param row
+   * @param acceptableLastFlushTimeMs : This is how old the last flush can be
+   * for it to be considered as a successful
+   * @param maxWaitTime : The maximum amount of time we should wait
+   * hoping for a flush to happen
+   * @throws IOException : The client can safely retry on exception.
+   */
+  public void flushRegionLazyForRow(byte[] row, long acceptableLastFlushTimeMs,
+      long maxWaitTime) throws IOException {
+    HRegionLocation loc = this.getRegionLocation(row);
+    HRegionInfo info = loc.getRegionInfo();
+    HServerAddress addr = loc.getServerAddress();
+    this.getConnectionAndResetOperationContext()
+      .flushRegionAndWait(info, addr, acceptableLastFlushTimeMs, maxWaitTime);
+  }
+
   /**
    * Implements the scanner interface for the HBase client.
    * If there are multiple regions in a table, this scanner will iterate

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java Sat Aug 24 18:18:20 2013
@@ -3,12 +3,20 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.*;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Threads;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -113,4 +121,70 @@ public class TestClientLocalScanner2 {
     assertTrue(rowCnt.get() == (3*numRows));
   }
 
+  @Test
+  public void testFlushAndWait() throws Exception {
+    final byte[] name = Bytes.toBytes("testFlushAndWait");
+    final HTable t = TEST_UTIL.createTable(name, FAMILY);
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setInt(HConstants.CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS,
+        1);
+    TEST_UTIL.loadTable(t, FAMILY);
+    HRegionLocation loc =
+        t.getRegionLocation(Bytes.toBytes("aaa"));
+    byte[] regionName = loc.getRegionInfo().getRegionName();
+    HRegionServer server =
+        TEST_UTIL.getRSWithRegion(regionName);
+    long lastFlushTime = server.getLastFlushTime(regionName);
+    conf.setBoolean(HConstants.CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT, true);
+    // Creates a local scanner which internally does a flush.
+    t.getLocalScanner(new Scan());
+    assertTrue(server.getLastFlushTime(regionName) >= lastFlushTime);
+  }
+
+  @Test
+  public void testForceFlushCase() {
+
+  }
+
+  @Test
+  public void testFlushAndVerifyPremptiveCase() throws Exception {
+    final byte[] name = Bytes.toBytes("testFlushAndVerifyPremptiveCase");
+    final HTable t = TEST_UTIL.createTable(name, FAMILY);
+    final Configuration conf = TEST_UTIL.getConfiguration();
+    TEST_UTIL.loadTable(t, FAMILY);
+    final HRegionLocation loc =
+        t.getRegionLocation(Bytes.toBytes("aaa"));
+    final HServerAddress addr = loc.getServerAddress();
+    final byte[] regionName = loc.getRegionInfo().getRegionName();
+    final HRegionServer server =
+        TEST_UTIL.getRSWithRegion(regionName);
+    final long window = 3000; // 3 sec
+
+    server.flushRegion(regionName);
+    Threads.sleep(window);
+    Thread preemptiveFlusherThread =
+        new Thread("PreemptivelyFlushWithinWindow") {
+      @Override
+      public void run() {
+        Threads.sleep(window/3); // 1 sec
+        try {
+          server.flushRegion(regionName);
+        } catch (Exception e) {
+          assertTrue(false);
+        }
+      }
+    };
+    HConnection conn = HConnectionManager.getConnection(conf);
+    long startTime = EnvironmentEdgeManager.currentTimeMillis();
+    preemptiveFlusherThread.start();
+    conn.flushRegionAndWait(loc.getRegionInfo(), addr, 0, window);
+    long elapsedTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+    assertTrue("The time taken should be atleast more than "
+        + (window/3) + " ms. It took " + elapsedTime + " ms",
+        elapsedTime >= window/3); // 1 sec
+    // The call should not be waiting untill window ms.
+    assertTrue("The time taken should not be more than "
+        + ((2 * window)/ 3) + " ms. It took " + elapsedTime + " ms",
+        elapsedTime <= (2 * window)/3); // 2 sec
+  }
 }