You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/04/26 22:15:14 UTC

svn commit: r1590291 - in /hbase/trunk/hbase-server: pom.xml src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java

Author: tedyu
Date: Sat Apr 26 20:15:14 2014
New Revision: 1590291

URL: http://svn.apache.org/r1590291
Log:
HBASE-11083 ExportSnapshot should provide capability to limit bandwidth consumption


Modified:
    hbase/trunk/hbase-server/pom.xml
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java

Modified: hbase/trunk/hbase-server/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/pom.xml?rev=1590291&r1=1590290&r2=1590291&view=diff
==============================================================================
--- hbase/trunk/hbase-server/pom.xml (original)
+++ hbase/trunk/hbase-server/pom.xml Sat Apr 26 20:15:14 2014
@@ -308,6 +308,11 @@
       <artifactId>commons-collections</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-distcp</artifactId>
+      <version>${hadoop-two.version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop-compat</artifactId>
     </dependency>

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java?rev=1590291&r1=1590290&r2=1590291&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java Sat Apr 26 20:15:14 2014
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.hbase.snapshot;
 
+import java.io.BufferedInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -62,6 +64,7 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.tools.util.ThrottledInputStream;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -86,6 +89,7 @@ public final class ExportSnapshot extend
   private static final String CONF_INPUT_ROOT = "snapshot.export.input.root";
   private static final String CONF_BUFFER_SIZE = "snapshot.export.buffer.size";
   private static final String CONF_MAP_GROUP = "snapshot.export.default.map.group";
+  private static final String CONF_BANDWIDTH_MB = "snapshot.export.map.bandwidth.mb";
 
   static final String CONF_TEST_FAILURE = "test.snapshot.export.failure";
   static final String CONF_TEST_RETRY = "test.snapshot.export.failure.retry";
@@ -225,7 +229,11 @@ public final class ExportSnapshot extend
         }
       }
 
-      FSDataInputStream in = openSourceFile(context, inputPath);
+      InputStream in = openSourceFile(context, inputPath);
+      int bandwidthMB = context.getConfiguration().getInt(CONF_BANDWIDTH_MB, 100);
+      if (Integer.MAX_VALUE != bandwidthMB) {
+        in = new ThrottledInputStream(new BufferedInputStream(in), bandwidthMB * 1024 * 1024);
+      }
       try {
         context.getCounter(Counter.BYTES_EXPECTED).increment(inputStat.getLen());
 
@@ -298,7 +306,7 @@ public final class ExportSnapshot extend
     }
 
     private void copyData(final Context context,
-        final Path inputPath, final FSDataInputStream in,
+        final Path inputPath, final InputStream in,
         final Path outputPath, final FSDataOutputStream out,
         final long inputFileSize)
         throws IOException {
@@ -585,7 +593,8 @@ public final class ExportSnapshot extend
   private void runCopyJob(final Path inputRoot, final Path outputRoot,
       final List<Pair<Path, Long>> snapshotFiles, final boolean verifyChecksum,
       final String filesUser, final String filesGroup, final int filesMode,
-      final int mappers) throws IOException, InterruptedException, ClassNotFoundException {
+      final int mappers, final int bandwidthMB)
+          throws IOException, InterruptedException, ClassNotFoundException {
     Configuration conf = getConf();
     if (filesGroup != null) conf.set(CONF_FILES_GROUP, filesGroup);
     if (filesUser != null) conf.set(CONF_FILES_USER, filesUser);
@@ -594,6 +603,7 @@ public final class ExportSnapshot extend
     conf.set(CONF_OUTPUT_ROOT, outputRoot.toString());
     conf.set(CONF_INPUT_ROOT, inputRoot.toString());
     conf.setInt("mapreduce.job.maps", mappers);
+    conf.setInt(CONF_BANDWIDTH_MB, bandwidthMB);
 
     Job job = new Job(conf);
     job.setJobName("ExportSnapshot");
@@ -655,6 +665,7 @@ public final class ExportSnapshot extend
     String filesGroup = null;
     String filesUser = null;
     Path outputRoot = null;
+    int bandwidthMB = Integer.MAX_VALUE;
     int filesMode = 0;
     int mappers = 0;
 
@@ -681,6 +692,8 @@ public final class ExportSnapshot extend
           filesUser = args[++i];
         } else if (cmd.equals("-chgroup")) {
           filesGroup = args[++i];
+        } else if (cmd.equals("-bandwidth")) {
+          bandwidthMB = Integer.parseInt(args[++i]);
         } else if (cmd.equals("-chmod")) {
           filesMode = Integer.parseInt(args[++i], 8);
         } else if (cmd.equals("-overwrite")) {
@@ -773,7 +786,7 @@ public final class ExportSnapshot extend
         LOG.warn("There are 0 store file to be copied. There may be no data in the table.");
       } else {
         runCopyJob(inputRoot, outputRoot, files, verifyChecksum,
-                   filesUser, filesGroup, filesMode, mappers);
+                   filesUser, filesGroup, filesMode, mappers, bandwidthMB);
       }
 
       // Step 3 - Rename fs2:/.snapshot/.tmp/<snapshot> fs2:/.snapshot/<snapshot>