You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2013/08/24 20:18:20 UTC
svn commit: r1517172 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/
test/java/org/apache/hadoop/hbase/client/
Author: liyin
Date: Sat Aug 24 18:18:20 2013
New Revision: 1517172
URL: http://svn.apache.org/r1517172
Log:
[HBASE-9325] Add functionality to flush and wait and use it in ClientLocalScanner
Author: manukranthk
Summary: Add a functionality in HConnectionManager to perform flushAndWait for flush.
Test Plan: Unit Test
Reviewers: adela, aaiyer, gauravm
Reviewed By: aaiyer
CC: hbase-eng@, liyintang
Differential Revision: https://phabricator.fb.com/D898956
Task ID: 2649976
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HConstants.java Sat Aug 24 18:18:20 2013
@@ -829,6 +829,41 @@ public final class HConstants {
public static final String USE_MSLAB_KEY = "hbase.hregion.memstore.mslab.enabled";
public static final boolean USE_MSLAB_DEFAULT = false;
+ /**
+ * This wait time is used to periodically probe until
+ * we exhaust the timeout in the window
+ */
+ public static final String WAIT_TIME_FOR_FLUSH_MS =
+ "hbase.hregion.flush.waittime";
+ public static final long DEFAULT_WAIT_TIME_FOR_FLUSH_MS = 100; //ms
+
+ /**
+ * The knob to turn on the ClientLocalScanner to flush and wait for the
+ * region flush to finish before it opens the store files.
+ * Set the socket timeout for the RPC appropriately for this.
+ */
+ public static final String CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT =
+ "hbase.clientlocalscanner.flush.and.wait";
+ public static final boolean DEFAULT_CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT =
+ false;
+
+ /**
+ * The acceptable staleness of a flush. Say if this value is set to 10s,
+ * if there was a flush in the last 10s, we would not flush again.
+ */
+ public static final String CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS =
+ "hbase.clientlocalscanner.flush.acceptable.staleness";
+ public static final long DEFAULT_CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS =
+ 30000; // ms
+
+ /**
+ * The extra wait time that we wait for the flush to take place.
+ */
+ public static final String CLIENT_LOCAL_SCANNER_MAX_WAITTIME_FOR_FLUSH_MS =
+ "hbase.clientlocal.scanner.flush.maxwaittime";
+ public static final int DEFAULT_CLIENT_LOCAL_SCANNER_MAX_WAITTIME_FOR_FLUSH_MS
+ = 10000; // ms
+
private HConstants() {
// Can't be instantiated with this constructor.
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/ClientLocalScanner.java Sat Aug 24 18:18:20 2013
@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionUtilities;
@@ -90,15 +92,36 @@ public class ClientLocalScanner extends
this.areHardlinksCreated = areHardlinksCreated;
}
+ private void flushRegionAndWaitForFlush(HRegionInfo info, HServerAddress addr)
+ throws IOException
+ {
+ HTable table = this.htable;
+ try {
+ long window = table.getConfiguration().getLong(
+ HConstants.CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS,
+ HConstants.DEFAULT_CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS);
+ table.flushRegionForRow(info.getStartKey(), window);
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
+ }
+
/**
* Creates and initializes the stores in the table.
* @throws IOException
*/
- public void openStoresOnClient() throws IOException{
+ public void openStoresOnClient() throws IOException {
final HRegionInfo info = this.currentRegion;
this.currentRegion.getTableDesc().setReadOnly(true);
HTable table = this.htable;
final Configuration conf = table.getConfiguration();
+ boolean flushAndWait = conf.getBoolean(
+ HConstants.CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT,
+ HConstants.DEFAULT_CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT);
+ if (flushAndWait) {
+ flushRegionAndWaitForFlush(this.currentRegion,
+ table.getRegionsInfo().get(info));
+ }
Path rootDir = FSUtils.getRootDir(conf);
final Path tableDir =
HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName());
@@ -139,8 +162,10 @@ public class ClientLocalScanner extends
* @return true is the scanners were opened properly on the new HRegion.
* false otherwise
* @throws IOException
+ * @throws Exception
*/
- protected boolean doRealOpenScanners(byte[] localStartKey) throws IOException {
+ protected boolean doRealOpenScanners(byte[] localStartKey)
+ throws IOException {
try {
this.currentRegion =
htable.getRegionLocation(localStartKey).getRegionInfo();
@@ -194,6 +219,7 @@ public class ClientLocalScanner extends
* Reads the current region server for the next caching number of rows.
* It might progress the region server in case there were no values found
* in the current region server.
+ * @throws IOException
*/
public void cacheNextResults() throws IOException {
Result [] values = null;
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnection.java Sat Aug 24 18:18:20 2013
@@ -391,4 +391,25 @@ public interface HConnection extends Clo
*/
public void resetOperationContext();
+ /**
+ * Returns if the most recent flush on this region happens in the window
+ * [current server time - acceptableWindowForLastFlush,
+ * current server time + maximumWaitTime).
+ * Else it will force flush
+ * i.e. the call will return immediately if the last flush happened
+ * in the last acceptableWindowForLastFlush ms or
+ * it will wait until maximumWaitTime ms for a flush to happen.
+ * If there was no flush of that sort, it will force a flush.
+ * @param regionInfo : The HRegionInfo that needs to be flushed.
+ * @param addr : The HServerAddress of the region server holding the region
+ * @param acceptableWindowForLastFlush : The acceptable window for the
+ * last flush. i.e. if there was a flush between current time and
+ * current time - acceptableWindowForLastFlush,
+ * we consider that flush to be good enough.
+ * @param maximumWaitTime : The maximum amount of time we wait for the
+ * @throws IOException
+ */
+ public void flushRegionAndWait(final HRegionInfo regionInfo,
+ final HServerAddress addr, long acceptableWindowForLastFlush,
+ long maximumWaitTime) throws IOException;
}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Aug 24 18:18:20 2013
@@ -2991,6 +2991,49 @@ public class HConnectionManager {
}
}
+ public void flushRegionAndWait(final HRegionInfo regionInfo,
+ final HServerAddress addr, long acceptableWindowForLastFlush,
+ long maximumWaitTime) throws IOException {
+ HBaseRPCOptions options = new HBaseRPCOptions();
+ try {
+ long serverTime =
+ (long)this.createCurrentTimeCallable(addr, options).call();
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ long leastAcceptableFlushTime =
+ serverTime - acceptableWindowForLastFlush;
+ boolean forceFlush = false;
+ boolean waitForFlushToHappen = true;
+ do {
+ long lastFlushTime = this.getRegionServerWithoutRetries(
+ new ServerCallableForBatchOps<Long>(this, addr, options) {
+ public Long call() {
+ return server.getLastFlushTime(regionInfo.getRegionName());
+ }
+ }, true);
+ if (lastFlushTime >= leastAcceptableFlushTime) break; // flush happened
+ long elapsedTime =
+ EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ if (elapsedTime >= maximumWaitTime) {
+ forceFlush = true;
+ break;
+ }
+ // Sleep and hope for flush to happen
+ Threads.sleep(
+ this.conf.getLong(HConstants.WAIT_TIME_FOR_FLUSH_MS,
+ HConstants.DEFAULT_WAIT_TIME_FOR_FLUSH_MS));
+ } while (waitForFlushToHappen);
+ if (forceFlush) {
+ Callable<Void> flushCallable =
+ this.createFlushCallable(addr, regionInfo, serverTime, options);
+ flushCallable.call();
+ }
+ } catch (Exception e) {
+ // Throwing IOException since the error is recoverable
+ // and the function can be safely retried.
+ throw new IOException(e);
+ }
+ }
+
private void trackMutationsToTable(byte[] tableNameBytes,
HRegionLocation location) throws IOException {
String tableName = Bytes.toString(tableNameBytes);
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Sat Aug 24 18:18:20 2013
@@ -602,7 +602,7 @@ public class HTable implements HTableInt
* Otherwise, we assume that the table hierarchy under the root directory
* is not going to change and hence we read directly from it; use with caution
* @return
- * @throws IOException
+ * @throws IOException : retry on IOException
*/
public ResultScanner getLocalScanner(final Scan scan,
boolean createNewHardlinks) throws IOException {
@@ -990,6 +990,36 @@ public class HTable implements HTableInt
return writeBuffer;
}
+
+ /**
+ * Flushes a region for the give row.
+ * @param row
+ * @param acceptableLastFlushTimeMs : The acceptable last flush time in ms
+ * @throws IOException : The client can safely retry on IOException
+ */
+ public void flushRegionForRow(byte[] row, long acceptableLastFlushTimeMs)
+ throws IOException {
+ flushRegionLazyForRow(row, acceptableLastFlushTimeMs, 0);
+ }
+
+ /**
+ * Flushes a region for the given row.
+ * @param row
+ * @param acceptableLastFlushTimeMs : This is how old the last flush can be
+ * for it to be considered as a successful
+ * @param maxWaitTime : The maximum amount of time we should wait
+ * hoping for a flush to happen
+ * @throws IOException : The client can safely retry on exception.
+ */
+ public void flushRegionLazyForRow(byte[] row, long acceptableLastFlushTimeMs,
+ long maxWaitTime) throws IOException {
+ HRegionLocation loc = this.getRegionLocation(row);
+ HRegionInfo info = loc.getRegionInfo();
+ HServerAddress addr = loc.getServerAddress();
+ this.getConnectionAndResetOperationContext()
+ .flushRegionAndWait(info, addr, acceptableLastFlushTimeMs, maxWaitTime);
+ }
+
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java?rev=1517172&r1=1517171&r2=1517172&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestClientLocalScanner2.java Sat Aug 24 18:18:20 2013
@@ -3,12 +3,20 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.*;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -113,4 +121,70 @@ public class TestClientLocalScanner2 {
assertTrue(rowCnt.get() == (3*numRows));
}
+ @Test
+ public void testFlushAndWait() throws Exception {
+ final byte[] name = Bytes.toBytes("testFlushAndWait");
+ final HTable t = TEST_UTIL.createTable(name, FAMILY);
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setInt(HConstants.CLIENT_LOCAL_SCANNER_FLUSH_ACCEPTABLE_STALENESS_MS,
+ 1);
+ TEST_UTIL.loadTable(t, FAMILY);
+ HRegionLocation loc =
+ t.getRegionLocation(Bytes.toBytes("aaa"));
+ byte[] regionName = loc.getRegionInfo().getRegionName();
+ HRegionServer server =
+ TEST_UTIL.getRSWithRegion(regionName);
+ long lastFlushTime = server.getLastFlushTime(regionName);
+ conf.setBoolean(HConstants.CLIENT_LOCAL_SCANNER_FLUSH_AND_WAIT, true);
+ // Creates a local scanner which internally does a flush.
+ t.getLocalScanner(new Scan());
+ assertTrue(server.getLastFlushTime(regionName) >= lastFlushTime);
+ }
+
+ @Test
+ public void testForceFlushCase() {
+
+ }
+
+ @Test
+ public void testFlushAndVerifyPremptiveCase() throws Exception {
+ final byte[] name = Bytes.toBytes("testFlushAndVerifyPremptiveCase");
+ final HTable t = TEST_UTIL.createTable(name, FAMILY);
+ final Configuration conf = TEST_UTIL.getConfiguration();
+ TEST_UTIL.loadTable(t, FAMILY);
+ final HRegionLocation loc =
+ t.getRegionLocation(Bytes.toBytes("aaa"));
+ final HServerAddress addr = loc.getServerAddress();
+ final byte[] regionName = loc.getRegionInfo().getRegionName();
+ final HRegionServer server =
+ TEST_UTIL.getRSWithRegion(regionName);
+ final long window = 3000; // 3 sec
+
+ server.flushRegion(regionName);
+ Threads.sleep(window);
+ Thread preemptiveFlusherThread =
+ new Thread("PreemptivelyFlushWithinWindow") {
+ @Override
+ public void run() {
+ Threads.sleep(window/3); // 1 sec
+ try {
+ server.flushRegion(regionName);
+ } catch (Exception e) {
+ assertTrue(false);
+ }
+ }
+ };
+ HConnection conn = HConnectionManager.getConnection(conf);
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ preemptiveFlusherThread.start();
+ conn.flushRegionAndWait(loc.getRegionInfo(), addr, 0, window);
+ long elapsedTime = EnvironmentEdgeManager.currentTimeMillis() - startTime;
+ assertTrue("The time taken should be atleast more than "
+ + (window/3) + " ms. It took " + elapsedTime + " ms",
+ elapsedTime >= window/3); // 1 sec
+ // The call should not be waiting untill window ms.
+ assertTrue("The time taken should not be more than "
+ + ((2 * window)/ 3) + " ms. It took " + elapsedTime + " ms",
+ elapsedTime <= (2 * window)/3); // 2 sec
+ }
}