You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2012/09/22 14:52:00 UTC
svn commit: r1388800 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/
main/java/org/apache/hadoop/hbase/ipc/
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase...
Author: mbautin
Date: Sat Sep 22 12:51:59 2012
New Revision: 1388800
URL: http://svn.apache.org/viewvc?rev=1388800&view=rev
Log:
[HBASE-5783] [0.89-fb] Improve false positives: Retry only if the server restarts
Author: aaiyer
Summary:
Batched upload should not abort the job, if the regions
were moved due to benign reasons.
Test Plan:
run MR tests.
add a unit test to test the behavior
Reviewers: kannan, kranganathan, mbautin
Reviewed By: kranganathan
CC: hbase-eng@
Differential Revision: https://phabricator.fb.com/D581121
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/HRegionLocation.java Sat Sep 22 12:51:59 2012
@@ -26,6 +26,11 @@ package org.apache.hadoop.hbase;
public class HRegionLocation implements Comparable<HRegionLocation> {
private HRegionInfo regionInfo;
private HServerAddress serverAddress;
+ private long serverStartCode;
+
+ public long getServerStartCode() {
+ return serverStartCode;
+ }
/**
* Constructor
@@ -34,8 +39,13 @@ public class HRegionLocation implements
* @param serverAddress the HServerAddress for the region server
*/
public HRegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress) {
+ this(regionInfo, serverAddress, -1);
+ }
+ public HRegionLocation(HRegionInfo regionInfo, HServerAddress serverAddress,
+ long serverStartCode) {
this.regionInfo = regionInfo;
this.serverAddress = serverAddress;
+ this.serverStartCode = serverStartCode;
}
/**
@@ -43,8 +53,8 @@ public class HRegionLocation implements
*/
@Override
public String toString() {
- return "address: " + this.serverAddress.toString() + ", regioninfo: " +
- this.regionInfo;
+ return "address: " + this.serverAddress.toString() + ", serverStartCode: " +
+ this.serverStartCode + ", regioninfo: " + this.regionInfo;
}
/**
@@ -71,6 +81,7 @@ public class HRegionLocation implements
public int hashCode() {
int result = this.regionInfo.hashCode();
result ^= this.serverAddress.hashCode();
+ result ^= (int)this.serverStartCode;
return result;
}
@@ -93,6 +104,10 @@ public class HRegionLocation implements
if(result == 0) {
result = this.serverAddress.compareTo(o.serverAddress);
}
+ if(result == 0) {
+ result = (this.serverStartCode > o.serverStartCode) ? 1 :
+ (this.serverStartCode < o.serverStartCode)? -1 : 0;
+ }
return result;
}
-}
\ No newline at end of file
+}
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java Sat Sep 22 12:51:59 2012
@@ -28,6 +28,7 @@ import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -443,7 +444,7 @@ public class HConnectionManager {
// keep track of servers that have been updated for batchedLoad
// tablename -> Map
- Map<String, ConcurrentMap<HRegionInfo, HServerAddress>> batchedUploadUpdatesMap;
+ Map<String, ConcurrentMap<HRegionInfo, HRegionLocation>> batchedUploadUpdatesMap;
private int batchedUploadSoftFlushRetries;
private long batchedUploadSoftFlushTimeoutMillis;
/**
@@ -496,7 +497,7 @@ public class HConnectionManager {
this.batchedUploadSoftFlushTimeoutMillis =
conf.getLong("hbase.client.batched-upload.softflush.timeout.ms", 60000L); // 1 min
batchedUploadUpdatesMap = new ConcurrentHashMap<String,
- ConcurrentMap<HRegionInfo, HServerAddress>>();
+ ConcurrentMap<HRegionInfo, HRegionLocation>>();
}
private long getPauseTime(int tries) {
@@ -878,9 +879,16 @@ public class HConnectionManager {
}
final String serverAddress = Bytes.toString(value);
+ value = result.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.STARTCODE_QUALIFIER);
+ long serverStartCode = -1;
+ if(value != null) {
+ serverStartCode = Bytes.toLong(value);
+ }
+
// instantiate the location
HRegionLocation loc = new HRegionLocation(regionInfo,
- new HServerAddress(serverAddress));
+ new HServerAddress(serverAddress), serverStartCode);
// cache this meta entry
cacheLocation(tableName, loc);
}
@@ -1020,9 +1028,16 @@ public class HConnectionManager {
Bytes.toStringBinary(row));
}
+ value = regionInfoRow.getValue(HConstants.CATALOG_FAMILY,
+ HConstants.STARTCODE_QUALIFIER);
+ long serverStartCode = -1;
+ if(value != null) {
+ serverStartCode = Bytes.toLong(value);
+ }
// instantiate the location
location = new HRegionLocation(regionInfo,
- new HServerAddress(serverAddress));
+ new HServerAddress(serverAddress),
+ serverStartCode);
cacheLocation(tableName, location);
return location;
} catch (TableNotFoundException e) {
@@ -1721,6 +1736,22 @@ public class HConnectionManager {
occasionallyCleanupFailureInformation();
}
+ private Callable<Long> createGetServerStartCodeCallable(
+ final HServerAddress address,
+ final HBaseRPCOptions options) {
+ final HConnection connection = this;
+ return new Callable<Long>() {
+ public Long call() throws IOException {
+ return getRegionServerWithoutRetries(
+ new ServerCallableForBatchOps<Long>(connection, address, options) {
+ public Long call() throws IOException {
+ return server.getStartCode();
+ }
+ });
+ }
+ };
+ }
+
private Callable<Long> createCurrentTimeCallable(
final HServerAddress address,
final HBaseRPCOptions options) {
@@ -1738,6 +1769,7 @@ public class HConnectionManager {
}
+
private Callable<MapWritable> createGetLastFlushTimesCallable(
final HServerAddress address,
final HBaseRPCOptions options) {
@@ -1843,8 +1875,7 @@ public class HConnectionManager {
if (isGets) {
actions.addGet(regionName, (Get)row, i);
} else {
- trackMutationsToTable(tableName,
- loc.getRegionInfo(), loc.getServerAddress());
+ trackMutationsToTable(tableName, loc);
actions.mutate(regionName, (Mutation)row);
}
}
@@ -2270,9 +2301,7 @@ public class HConnectionManager {
return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
tableName, row, options) {
public Integer call() throws IOException {
- trackMutationsToTable(tableName,
- location.getRegionInfo(),
- location.getServerAddress());
+ trackMutationsToTable(tableName, location);
return server.put(location.getRegionInfo().getRegionName(), puts);
}
});
@@ -2328,9 +2357,7 @@ public class HConnectionManager {
getRegionServerWithRetries(new ServerCallable<Void>(this.c,
tableName, row, options) {
public Void call() throws IOException {
- trackMutationsToTable(tableName,
- location.getRegionInfo(),
- location.getServerAddress());
+ trackMutationsToTable(tableName, location);
server.mutateRow(location.getRegionInfo().getRegionName(),
mutations);
return null;
@@ -2356,9 +2383,7 @@ public class HConnectionManager {
return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
tableName, row, options) {
public Integer call() throws IOException {
- trackMutationsToTable(tableName,
- location.getRegionInfo(),
- location.getServerAddress());
+ trackMutationsToTable(tableName, location);
return server.delete(location.getRegionInfo().getRegionName(),
deletes);
}
@@ -2401,8 +2426,7 @@ public class HConnectionManager {
regionPuts.put(address, mput);
}
mput.add(regionName, put);
- trackMutationsToTable(tableName,
- loc.getRegionInfo(), loc.getServerAddress());
+ trackMutationsToTable(tableName, loc);
}
return new ArrayList<MultiPut>(regionPuts.values());
@@ -2690,74 +2714,97 @@ public class HConnectionManager {
@Override
public void startBatchedLoad(byte[] tableName) {
batchedUploadUpdatesMap.put(Bytes.toString(tableName),
- new ConcurrentHashMap<HRegionInfo, HServerAddress>());
+ new ConcurrentHashMap<HRegionInfo, HRegionLocation>());
}
@Override
public void endBatchedLoad(byte[] tableName, HBaseRPCOptions options) throws IOException {
- Map<HRegionInfo, HServerAddress> regionsUpdated = getRegionsUpdated(tableName);
-
- // get the current TS from the RegionServer
- Map<HRegionInfo, Long> targetTSMap = getCurrentTimeForRegions(regionsUpdated, options);
+ Map<HRegionInfo, HRegionLocation> regionsUpdated = getRegionsUpdated(tableName);
- // loop to ensure that we have flushed beyond the corresponding TS.
- int tries = 0;
- long now = EnvironmentEdgeManager.currentTimeMillis();
- long waitUntilForHardFlush = now + batchedUploadSoftFlushTimeoutMillis;
+ try {
+ // get the current TS from the RegionServer
+ Map<HRegionInfo, Long> targetTSMap = getCurrentTimeForRegions(regionsUpdated, options);
- while (tries++ < this.batchedUploadSoftFlushRetries
- && now < waitUntilForHardFlush) {
- // get the lastFlushedTS from the RegionServer. throws Exception if the region has
- // moved elsewhere
- Map<HRegionInfo, Long> flushedTSMap =
- getRegionFlushTimes(regionsUpdated, options);
-
- for (Entry<HRegionInfo, Long> entry: targetTSMap.entrySet()) {
- HRegionInfo region = entry.getKey();
- long targetTime = entry.getValue().longValue();
- long flushedTime = flushedTSMap.get(region).longValue();
- if (flushedTime > targetTime) {
- targetTSMap.remove(region);
- }
- LOG.debug("Region " + region.getEncodedName() + " was flushed at "
- + flushedTime
- + (flushedTime > targetTime
- ? ". All updates we made are already on disk."
- : ". Still waiting for updates to go to the disk.")
- + " Last update was made "
- + (flushedTime > targetTime
- ? ((flushedTime - targetTime) + " ms before flush.")
- : ((targetTime - flushedTime) + " ms after flush.")));
+ // loop to ensure that we have flushed beyond the corresponding TS.
+ int tries = 0;
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ long waitUntilForHardFlush = now + batchedUploadSoftFlushTimeoutMillis;
+
+ while (tries++ < this.batchedUploadSoftFlushRetries
+ && now < waitUntilForHardFlush) {
+ // get the lastFlushedTS from the RegionServer. throws Exception if the region has
+ // moved elsewhere
+ Map<HRegionInfo, Long> flushedTSMap =
+ getRegionFlushTimes(regionsUpdated, options);
+
+ for (Entry<HRegionInfo, Long> entry: targetTSMap.entrySet()) {
+ HRegionInfo region = entry.getKey();
+ long targetTime = entry.getValue().longValue();
+ long flushedTime = flushedTSMap.get(region).longValue();
+ if (flushedTime > targetTime) {
+ targetTSMap.remove(region);
+ }
+ LOG.debug("Region " + region.getEncodedName() + " was flushed at "
+ + flushedTime
+ + (flushedTime > targetTime
+ ? ". All updates we made are already on disk."
+ : ". Still waiting for updates to go to the disk.")
+ + " Last update was made "
+ + (flushedTime > targetTime
+ ? ((flushedTime - targetTime) + " ms before flush.")
+ : ((targetTime - flushedTime) + " ms after flush.")));
+ }
+
+ if (targetTSMap.isEmpty()) {
+ LOG.info("All regions have been flushed.");
+ break;
+ }
+ LOG.info("Try #" + tries + ". Still waiting to flush " + targetTSMap.size() + " regions.");
+ long sleepTime = getPauseTime(tries);
+ Threads.sleep(sleepTime);
+ now = EnvironmentEdgeManager.currentTimeMillis();
}
- if (targetTSMap.isEmpty()) {
- LOG.info("All regions have been flushed.");
- break;
- }
- LOG.info("Try #" + tries + ". Still waiting to flush " + targetTSMap.size() + " regions.");
- long sleepTime = getPauseTime(tries);
- Threads.sleep(sleepTime);
- now = EnvironmentEdgeManager.currentTimeMillis();
- }
+ // if we have not succeded in flushing all. Force flush.
+ if (!targetTSMap.isEmpty()) {
+ LOG.info("Forcing regions to flush.");
+ flushRegionsAtServers(targetTSMap, regionsUpdated, options);
+ }
+
+ // check to see that the servers' current startcode is the same as the one
+ // that we have in the map. Having a different start code means that the
+ // regionserver may have restarted. To avoid data loss, in case the restart
+ // were unclean, we will throw an exception and expect the client to retry.
+ //
+ // Note that we should be doing this at the very end. After flushing.
+ // A successful region flush does not guarantee that the data was
+ // persisted correctly. A regionserver could have crashed/restarted,
+ // opened the same region again, and then flushed it.
+ checkServersAreAlive(regionsUpdated.values(), options);
- // if we have not succeded in flushing all. Force flush.
- if (!targetTSMap.isEmpty()) {
- LOG.info("Forcing regions to flush.");
- flushRegionsAtServers(targetTSMap, regionsUpdated, options);
+ clearMutationsToTable(tableName);
+ } catch (IOException e) {
+ throw new ClientSideDoNotRetryException("One or more regionservers have restarted"
+ + ". Please retry the job");
}
-
- clearMutationsToTable(tableName);
}
private void trackMutationsToTable(byte[] tableNameBytes,
- HRegionInfo regionInfo, HServerAddress serverAddress) throws IOException {
+ HRegionLocation location) throws IOException {
String tableName = Bytes.toString(tableNameBytes);
- HServerAddress oldAddress = !batchedUploadUpdatesMap.containsKey(tableName) ? null
- : batchedUploadUpdatesMap.get(tableName).putIfAbsent(regionInfo, serverAddress);
- if (oldAddress != null && !oldAddress.equals(serverAddress)) {
- throw new ClientSideDoNotRetryException("Region "
- + regionInfo.getRegionNameAsString() + " moved from " + oldAddress
- + ". but has moved to." + serverAddress );
+ HRegionInfo regionInfo = location.getRegionInfo();
+ HServerAddress serverAddress = location.getServerAddress();
+ HRegionLocation oldLocation = !batchedUploadUpdatesMap.containsKey(tableName) ? null
+ : batchedUploadUpdatesMap.get(tableName).putIfAbsent(regionInfo, location);
+ if (oldLocation != null && !oldLocation.equals(location)) {
+ // check if the old server is alive
+ try {
+ checkIfAlive(oldLocation);
+ } catch (IOException e) {
+ throw new ClientSideDoNotRetryException("Region "
+ + regionInfo.getRegionNameAsString() + " moved from " + oldLocation
+ + " to." + serverAddress + ". Old location not reachable" );
+ }
}
}
@@ -2768,7 +2815,7 @@ public class HConnectionManager {
* @param tableName
* @return Map containing regionInfo, and the servers they were on.
*/
- private Map<HRegionInfo, HServerAddress> getRegionsUpdated(byte[] tableName) {
+ private Map<HRegionInfo, HRegionLocation> getRegionsUpdated(byte[] tableName) {
return batchedUploadUpdatesMap.get(Bytes.toString(tableName));
}
@@ -2785,6 +2832,72 @@ public class HConnectionManager {
}
/**
+ * This method is called to check if the remote server is alive, and has the
+ * same invocation id/start code, as before.
+ *
+ * @param location
+ * @throws RuntimeException
+ * @throws IOException
+ */
+ private void checkIfAlive(HRegionLocation location) throws IOException {
+ checkServersAreAlive(Collections.singletonList(location), HBaseRPCOptions.DEFAULT);
+ }
+
+ /**
+ * This method is called to check if the servers that hosted the regions are alive,
+ * and have the same invocation id/start code, as before.
+ * @param regionsContacted -- collection of region locations
+ * @param options -- rpc options to use
+ * @throws IOException if (a) could not talk to the server, or (b) any of the regions
+ * have moved from the location indicated in regionsContacted.
+ */
+ private void checkServersAreAlive(
+ Collection<HRegionLocation> regionsContacted,
+ HBaseRPCOptions options) throws IOException {
+
+ HashMap<HServerAddress, Future<Long>> futures =
+ new HashMap<HServerAddress,Future<Long>>();
+ Map<HServerAddress, Long> currentServerStartCode =
+ new HashMap<HServerAddress, Long>();
+
+ // get start codes from that server
+ for (HRegionLocation location: regionsContacted) {
+ HServerAddress server = location.getServerAddress();
+ if (!futures.containsKey(server)) {
+ futures.put(server, HTable.multiActionThreadPool.submit(
+ createGetServerStartCodeCallable(server, options)));
+ }
+ }
+
+ // populate server start codes;
+ for (HServerAddress server: futures.keySet()) {
+ Future<Long> future = futures.get(server);
+ try {
+ currentServerStartCode.put(server, future.get());
+ } catch (InterruptedException e) {
+ throw new IOException("Interrupted: Could not get current time from server"
+ + server);
+ } catch (ExecutionException e) {
+ throw new IOException("Could not get current time from " + server,
+ e.getCause());
+ }
+ }
+
+ // check startcodes for each region
+ for (HRegionLocation location: regionsContacted) {
+ HServerAddress server = location.getServerAddress();
+ long expectedStartCode = location.getServerStartCode();
+ Long startCode = currentServerStartCode.get(server);
+ if (startCode.longValue() != expectedStartCode) {
+ LOG.debug("Current startcode for server " + server + " is " + startCode.longValue()
+ + " looking for " + location.toString());
+ throw new IOException("RegionServer restarted.");
+ }
+ }
+ }
+
+
+ /**
* Get the current time in milliseconds at the server for each
* of the regions in the map.
* @param regionsContacted -- map of regions to server address
@@ -2794,7 +2907,7 @@ public class HConnectionManager {
* have moved from the location indicated in regionsContacted.
*/
private Map<HRegionInfo, Long> getCurrentTimeForRegions(
- Map<HRegionInfo, HServerAddress> regionsContacted,
+ Map<HRegionInfo, HRegionLocation> regionsContacted,
HBaseRPCOptions options) throws IOException {
Map<HRegionInfo, Long> currentTimeForRegions =
@@ -2806,9 +2919,12 @@ public class HConnectionManager {
new HashMap<HServerAddress,Future<Long>>();
// get flush times from that server
- for (HServerAddress server: regionsContacted.values()) {
- futures.put(server, HTable.multiActionThreadPool.submit(
- createCurrentTimeCallable(server, options)));
+ for (HRegionLocation location: regionsContacted.values()) {
+ HServerAddress server = location.getServerAddress();
+ if (!futures.containsKey(server)) {
+ futures.put(server, HTable.multiActionThreadPool.submit(
+ createCurrentTimeCallable(server, options)));
+ }
}
// populate serverTimes;
@@ -2826,7 +2942,7 @@ public class HConnectionManager {
}
for(HRegionInfo region: regionsContacted.keySet()) {
- HServerAddress serverToLookFor = regionsContacted.get(region);
+ HServerAddress serverToLookFor = regionsContacted.get(region).getServerAddress();
Long currentTime = currentTimeAtServers.get(serverToLookFor);
currentTimeForRegions.put(region, currentTime);
}
@@ -2837,13 +2953,13 @@ public class HConnectionManager {
* Ask the regionservers to flush the given regions, if they have not flushed
* past the desired time.
* @param targetTSMap -- map of regions to the desired flush time
- * @param regionsContacted -- map of regions to server address
+ * @param regionsUpdated -- map of regions to server address
* @param options -- rpc options to use
* @throws IOException if (a) could not talk to the server, or (b) any of the regions
* have moved from the location indicated in regionsContacted.
*/
private void flushRegionsAtServers(Map<HRegionInfo, Long> targetTSMap,
- Map<HRegionInfo, HServerAddress> regionsContacted,
+ Map<HRegionInfo, HRegionLocation> regionsUpdated,
HBaseRPCOptions options) throws IOException {
Map<HRegionInfo, Future<Void>> futures =
@@ -2851,7 +2967,7 @@ public class HConnectionManager {
// get flush times from that server
for (HRegionInfo region: targetTSMap.keySet()) {
- HServerAddress server = regionsContacted.get(region);
+ HServerAddress server = regionsUpdated.get(region).getServerAddress();
long targetFlushTime = targetTSMap.get(region).longValue();
LOG.debug("forcing a flush at " + server.getHostname());
@@ -2885,14 +3001,14 @@ public class HConnectionManager {
/**
* Ask the regionservers for the last flushed time for each regions.
- * @param regionsContacted -- map of regions to server address
+ * @param regionsUpdated -- map of regions to server address
* @param options -- rpc options to use
* @return Map from the region to the region's last flushed time.
* @throws IOException if (a) could not talk to the server, or (b) any of the regions
* have moved from the location indicated in regionsContacted.
*/
private Map<HRegionInfo, Long> getRegionFlushTimes(
- Map<HRegionInfo, HServerAddress> regionsContacted,
+ Map<HRegionInfo, HRegionLocation> regionsUpdated,
HBaseRPCOptions options) throws IOException {
Map<HRegionInfo, Long> regionFlushTimesMap =
@@ -2904,9 +3020,12 @@ public class HConnectionManager {
new HashMap<HServerAddress, Future<MapWritable>>();
// get flush times from that server
- for (HServerAddress server: regionsContacted.values()) {
- futures.put(server, HTable.multiActionThreadPool.submit(
- createGetLastFlushTimesCallable(server, options)));
+ for (HRegionLocation location: regionsUpdated.values()) {
+ HServerAddress server = location.getServerAddress();
+ if (!futures.containsKey(server)) {
+ futures.put(server, HTable.multiActionThreadPool.submit(
+ createGetLastFlushTimesCallable(server, options)));
+ }
}
boolean toCancel = false;
@@ -2930,8 +3049,8 @@ public class HConnectionManager {
throw toThrow;
}
- for(HRegionInfo region: regionsContacted.keySet()) {
- HServerAddress serverToLookFor = regionsContacted.get(region);
+ for(HRegionInfo region: regionsUpdated.keySet()) {
+ HServerAddress serverToLookFor = regionsUpdated.get(region).getServerAddress();
MapWritable serverMap = rsRegionTimes.get(serverToLookFor);
LongWritable lastFlushedTime = (LongWritable) serverMap.get(
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Sat Sep 22 12:51:59 2012
@@ -109,6 +109,12 @@ public interface HRegionInterface extend
public long getCurrentTimeMillis();
/**
+ * Gets the current startCode at the region server
+ * @return startCode -- time in milli seconds when the regionserver started.
+ */
+ public long getStartCode();
+
+ /**
* Get a list of store files for a particular CF in a particular region
* @param region name
* @param CF name
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Sat Sep 22 12:51:59 2012
@@ -2897,6 +2897,11 @@ public class HRegionServer implements HR
}
@Override
+ public long getStartCode() {
+ return this.serverInfo.getStartCode();
+ }
+
+ @Override
public long getCurrentTimeMillis() {
return EnvironmentEdgeManager.currentTimeMillis();
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/client/TestBatchedUpload.java Sat Sep 22 12:51:59 2012
@@ -67,7 +67,7 @@ public class TestBatchedUpload {
// start batch processing
// do a bunch of puts
// finish batch. Check for Exceptions.
- int attempts = writeData(ht, NUM_ROWS);
+ int attempts = writeData(ht, NUM_ROWS, true);
assert(attempts > 1);
readData(ht, NUM_ROWS);
@@ -75,7 +75,39 @@ public class TestBatchedUpload {
ht.close();
}
- public int writeData(HTable table, long numRows) throws IOException {
+ @Test
+ /*
+ * Test to make sure that if a region moves benignly, and both
+ * the source and dest region servers are alive, then the batch
+ * should succeed.
+ */
+ public void testBatchedUploadWithRegionMoves() throws Exception {
+ byte [] TABLE = Bytes.toBytes("testBatchedUploadWithRegionMoves");
+ int NUM_REGIONS = 10;
+ HTable ht = TEST_UTIL.createTable(TABLE, new byte[][]{FAMILY},
+ 3, Bytes.toBytes("aaaaa"), Bytes.toBytes("zzzzz"), NUM_REGIONS);
+ int NUM_ROWS = 1000;
+
+ // start batch processing
+ // do a bunch of puts
+ // finish batch. Check for Exceptions.
+ int attempts = writeData(ht, NUM_ROWS, false);
+ assert(attempts == 1);
+
+ readData(ht, NUM_ROWS);
+
+ ht.close();
+ }
+
+ /**
+ * Write data to the htable. While randomly killing/shutting down regionservers.
+ * @param table
+ * @param numRows
+ * @param killRS -- true to kill the RS. false to do a clean shutdown.
+ * @return number of attempts to complete the batch.
+ * @throws IOException
+ */
+ public int writeData(HTable table, long numRows, boolean killRS) throws IOException {
int attempts = 0;
int MAX = 10;
MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
@@ -109,9 +141,15 @@ public class TestBatchedUpload {
numRS = cluster.getRegionServerThreads().size();
int idxToKill = Math.abs(rand.nextInt()) % numRS;
LOG.debug("Try " + attempts + " written Puts : " + i);
- LOG.info("Randomly killing region server " + idxToKill + ". Got probability " + prob
- + " < " + killProb);
- cluster.abortRegionServer(idxToKill);
+ if (killRS) {
+ LOG.info("Randomly killing region server " + idxToKill
+ + ". Got probability " + prob + " < " + killProb);
+ cluster.abortRegionServer(idxToKill);
+ } else { // clean shutdown
+ LOG.info("Randomly shutting down region server " + idxToKill
+ + ". Got probability " + prob + " < " + killProb);
+ cluster.stopRegionServer(idxToKill);
+ }
// keep decreasing the probability of killing the RS
killProb = killProb / 2;
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/LoadTestTool.java Sat Sep 22 12:51:59 2012
@@ -88,6 +88,8 @@ public class LoadTestTool extends Abstra
private static final String OPT_KEY_WINDOW = "key_window";
private static final String OPT_WRITE = "write";
+ private static final String OPT_BATCHED_WRITES = "batch";
+ private static final String OPT_BATCHED_WRITES_CNT = "batch_cnt";
private static final String OPT_MAX_READ_ERRORS = "max_read_errors";
private static final String OPT_MULTIPUT = "multiput";
private static final String OPT_NUM_KEYS = "num_keys";
@@ -133,6 +135,10 @@ public class LoadTestTool extends Abstra
private int verifyPercent;
private int profilePercent = 0;
+ private boolean isBatched;
+
+ private int batchSize;
+
private String[] splitColonSeparated(String option,
int minNumCols, int maxNumCols) {
String optVal = cmd.getOptionValue(option);
@@ -197,6 +203,8 @@ public class LoadTestTool extends Abstra
"without port numbers");
addOptWithArg(OPT_TABLE_NAME, "The name of the table to read or write");
addOptWithArg(OPT_WRITE, OPT_USAGE_LOAD);
+ addOptWithArg(OPT_BATCHED_WRITES, "Use batched writes (with WAL)");
+ addOptWithArg(OPT_BATCHED_WRITES_CNT, "Size of a batch (if using batched writes)");
addOptWithArg(OPT_READ, OPT_USAGE_READ);
addOptWithArg(OPT_BLOOM, OPT_USAGE_BLOOM);
addOptWithArg(OPT_COMPRESSION, OPT_USAGE_COMPRESSION);
@@ -261,8 +269,15 @@ public class LoadTestTool extends Abstra
}
isMultiPut = cmd.hasOption(OPT_MULTIPUT);
+ isBatched = cmd.hasOption(OPT_BATCHED_WRITES);
+ if (cmd.hasOption(OPT_BATCHED_WRITES_CNT)) {
+ batchSize = parseInt(cmd.getOptionValue(OPT_BATCHED_WRITES_CNT),
+ 1, Integer.MAX_VALUE);
+ }
System.out.println("Multi-puts: " + isMultiPut);
+ System.out.println("isBatched: " + isBatched
+ + (isBatched ? " batch size " + batchSize : "" ));
System.out.println("Columns per key: " + minColsPerKey + ".."
+ maxColsPerKey);
System.out.println("Data size per column: " + minColDataSize + ".."
@@ -345,6 +360,8 @@ public class LoadTestTool extends Abstra
writerThreads = new MultiThreadedWriter(conf, tableName, COLUMN_FAMILY,
profilePercent, this.txCompression, this.rxCompression);
writerThreads.setMultiPut(isMultiPut);
+ writerThreads.setBatching(isBatched);
+ writerThreads.setBatchSize(batchSize);
writerThreads.setColumnsPerKey(minColsPerKey, maxColsPerKey);
writerThreads.setDataSize(minColDataSize, maxColDataSize);
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java?rev=1388800&r1=1388799&r2=1388800&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/util/MultiThreadedWriter.java Sat Sep 22 12:51:59 2012
@@ -83,6 +83,9 @@ public class MultiThreadedWriter extends
/** RPC compression */
private Compression.Algorithm txCompression;
private Compression.Algorithm rxCompression;
+
+ private boolean isBatched = false;
+ private int batchSize = Integer.MAX_VALUE;
public MultiThreadedWriter(Configuration conf, byte[] tableName,
byte[] columnFamily) {
@@ -156,6 +159,10 @@ public class MultiThreadedWriter extends
public void run() {
try {
long rowKey;
+ int count = 0;
+ if (isBatched) {
+ table.startBatchedLoad();
+ }
while ((rowKey = nextKeyToInsert.getAndIncrement()) < endKey) {
long numColumns = minColumnsPerKey + Math.abs(random.nextLong())
% (maxColumnsPerKey - minColumnsPerKey);
@@ -171,6 +178,26 @@ public class MultiThreadedWriter extends
if (trackInsertedKeys) {
insertedKeys.add(rowKey);
}
+
+ if (isBatched && ++count % batchSize == 0) {
+ try {
+ table.endBatchedLoad();
+ LOG.info("Count so far " + count + ". Batch ended");
+ } catch (IOException e) {
+ // log and continue.
+ LOG.info("Count so far " + count + ". Batch failed ", e);
+ }
+ table.startBatchedLoad();
+ }
+ }
+ if (isBatched && ++count % batchSize != 0) {
+ try {
+ table.endBatchedLoad();
+ LOG.info("Count so far " + count + ". Batch ended");
+ } catch (IOException e) {
+ // log and continue.
+ LOG.info("Count so far " + count + ". Batch failed ", e);
+ }
}
} finally {
try {
@@ -330,4 +357,12 @@ public class MultiThreadedWriter extends
trackInsertedKeys = enable;
}
+ public void setBatching(boolean isBatched) {
+ this.isBatched = isBatched;
+ }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
}