You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/05/19 18:17:54 UTC

svn commit: r1595947 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ hbase-server/ hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/

Author: tedyu
Date: Mon May 19 16:17:53 2014
New Revision: 1595947

URL: http://svn.apache.org/r1595947
Log:
HBASE-11090 Backport HBASE-11083 ExportSnapshot should provide capability to limit bandwidth consumption


Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
Modified:
    hbase/trunk/hbase-server/pom.xml
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java?rev=1595947&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/hadoopbackport/ThrottledInputStream.java Mon May 19 16:17:53 2014
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.io.hadoopbackport;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * The ThrottleInputStream provides bandwidth throttling on a specified
+ * InputStream. It is implemented as a wrapper on top of another InputStream
+ * instance.
+ * The throttling works by examining the number of bytes read from the underlying
+ * InputStream from the beginning, and sleep()ing for a time interval if
+ * the byte-transfer is found exceed the specified tolerable maximum.
+ * (Thus, while the read-rate might exceed the maximum for a given short interval,
+ * the average tends towards the specified maximum, overall.)
+ */
+public class ThrottledInputStream extends InputStream {
+
+  private final InputStream rawStream;
+  private final long maxBytesPerSec;
+  private final long startTime = System.currentTimeMillis();
+
+  private long bytesRead = 0;
+  private long totalSleepTime = 0;
+
+  private static final long SLEEP_DURATION_MS = 50;
+
+  public ThrottledInputStream(InputStream rawStream) {
+    this(rawStream, Long.MAX_VALUE);
+  }
+
+  public ThrottledInputStream(InputStream rawStream, long maxBytesPerSec) {
+    assert maxBytesPerSec > 0 : "Bandwidth " + maxBytesPerSec + " is invalid"; 
+    this.rawStream = rawStream;
+    this.maxBytesPerSec = maxBytesPerSec;
+  }
+
+  @Override
+  public void close() throws IOException {
+    rawStream.close();
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read() throws IOException {
+    throttle();
+    int data = rawStream.read();
+    if (data != -1) {
+      bytesRead++;
+    }
+    return data;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read(byte[] b) throws IOException {
+    throttle();
+    int readLen = rawStream.read(b);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    throttle();
+    int readLen = rawStream.read(b, off, len);
+    if (readLen != -1) {
+      bytesRead += readLen;
+    }
+    return readLen;
+  }
+
+  private void throttle() throws IOException {
+    if (getBytesPerSec() > maxBytesPerSec) {
+      try {
+        Thread.sleep(SLEEP_DURATION_MS);
+        totalSleepTime += SLEEP_DURATION_MS;
+      } catch (InterruptedException e) {
+        throw new IOException("Thread aborted", e);
+      }
+    }
+  }
+
+  /**
+   * Getter for the number of bytes read from this stream, since creation.
+   * @return The number of bytes.
+   */
+  public long getTotalBytesRead() {
+    return bytesRead;
+  }
+
+  /**
+   * Getter for the read-rate from this stream, since creation.
+   * Calculated as bytesRead/elapsedTimeSinceStart.
+   * @return Read rate, in bytes/sec.
+   */
+  public long getBytesPerSec() {
+    long elapsed = (System.currentTimeMillis() - startTime) / 1000;
+    if (elapsed == 0) {
+      return bytesRead;
+    } else {
+      return bytesRead / elapsed;
+    }
+  }
+
+  /**
+   * Getter the total time spent in sleep.
+   * @return Number of milliseconds spent in sleep.
+   */
+  public long getTotalSleepTime() {
+    return totalSleepTime;
+  }
+
+  /** @inheritDoc */
+  @Override
+  public String toString() {
+    return "ThrottledInputStream{" +
+        "bytesRead=" + bytesRead +
+        ", maxBytesPerSec=" + maxBytesPerSec +
+        ", bytesPerSec=" + getBytesPerSec() +
+        ", totalSleepTime=" + totalSleepTime +
+        '}';
+  }
+}

Modified: hbase/trunk/hbase-server/pom.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/pom.xml?rev=1595947&r1=1595946&r2=1595947&view=diff
==============================================================================
--- hbase/trunk/hbase-server/pom.xml (original)
+++ hbase/trunk/hbase-server/pom.xml Mon May 19 16:17:53 2014
@@ -308,11 +308,6 @@
       <artifactId>commons-collections</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-distcp</artifactId>
-      <version>${hadoop-two.version}</version>
-    </dependency>
-    <dependency>
       <groupId>org.apache.hbase</groupId>
       <artifactId>hbase-hadoop-compat</artifactId>
     </dependency>

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java?rev=1595947&r1=1595946&r2=1595947&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/ExportSnapshot.java Mon May 19 16:17:53 2014
@@ -67,7 +67,7 @@ import org.apache.hadoop.mapreduce.Mappe
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.tools.util.ThrottledInputStream;
+import org.apache.hadoop.hbase.io.hadoopbackport.ThrottledInputStream;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;