You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/04/26 22:15:14 UTC
svn commit: r1590291 - in /hbase/trunk/hbase-server: pom.xml
src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
Author: tedyu
Date: Sat Apr 26 20:15:14 2014
New Revision: 1590291
URL: http://svn.apache.org/r1590291
Log:
HBASE-11083 ExportSnapshot should provide capability to limit bandwidth consumption
Modified:
hbase/trunk/hbase-server/pom.xml
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
Modified: hbase/trunk/hbase-server/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/pom.xml?rev=1590291&r1=1590290&r2=1590291&view=diff
==============================================================================
--- hbase/trunk/hbase-server/pom.xml (original)
+++ hbase/trunk/hbase-server/pom.xml Sat Apr 26 20:15:14 2014
@@ -308,6 +308,11 @@
<artifactId>commons-collections</artifactId>
</dependency>
<dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-distcp</artifactId>
+ <version>${hadoop-two.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-hadoop-compat</artifactId>
</dependency>
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java?rev=1590291&r1=1590290&r2=1590291&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java Sat Apr 26 20:15:14 2014
@@ -18,8 +18,10 @@
package org.apache.hadoop.hbase.snapshot;
+import java.io.BufferedInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
@@ -62,6 +64,7 @@ import org.apache.hadoop.mapreduce.Mappe
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.tools.util.ThrottledInputStream;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -86,6 +89,7 @@ public final class ExportSnapshot extend
private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
+ private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
@@ -225,7 +229,11 @@ public final class ExportSnapshot extend
}
}
- FSDataInputStream in = openSourceFile(context, inputPath);
+ InputStream in = openSourceFile(context, inputPath);
+ int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
+ if (Integer.MAX_VALUE != bandwidthMB) {
+ in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
+ }
try {
context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
@@ -298,7 +306,7 @@ public final class ExportSnapshot extend
}
private void copyData(final Context context,
- final Path inputPath, final FSDataInputStream in,
+ final Path inputPath, final InputStream in,
final Path outputPath, final FSDataOutputStream out,
final long inputFileSize)
throws IOException {
@@ -585,7 +593,8 @@ public final class ExportSnapshot extend
private void runCopyJob(final Path inputRoot, final Path outputRoot,
final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
final String filesUser, final String filesGroup, final int filesMode,
- final int mappers) throws IOException, InterruptedException, ClassNotFoundException {
+ final int mappers, final int bandwidthMB)
+ throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = getConf();
if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
@@ -594,6 +603,7 @@ public final class ExportSnapshot extend
conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
conf.set(CONF_INPUT_ROOT, inputRoot.toString());
conf.setInt("mapreduce.job.maps", mappers);
+ conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
Job job = new Job(conf);
job.setJobName("ExportSnapshot");
@@ -655,6 +665,7 @@ public final class ExportSnapshot extend
String filesGroup = null;
String filesUser = null;
Path outputRoot = null;
+ int bandwidthMB = Integer.MAX_VALUE;
int filesMode = 0;
int mappers = 0;
@@ -681,6 +692,8 @@ public final class ExportSnapshot extend
filesUser = args[++i];
} else if (cmd.equals("-chgroup")) {
filesGroup = args[++i];
+ } else if (cmd.equals("-bandwidth")) {
+ bandwidthMB = Integer.parseInt(args[++i]);
} else if (cmd.equals("-chmod")) {
filesMode = Integer.parseInt(args[++i], 8);
} else if (cmd.equals("-overwrite")) {
@@ -773,7 +786,7 @@ public final class ExportSnapshot extend
LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
} else {
runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
- filesUser, filesGroup, filesMode, mappers);
+ filesUser, filesGroup, filesMode, mappers, bandwidthMB);
}
// Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>