You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2016/12/12 23:06:27 UTC
hadoop git commit: HADOOP-13871.
ITestS3AInputStreamPerformance.testTimeToOpenAndReadWholeFileBlocks
performance awful. Contributed by Steve Loughran
Repository: hadoop
Updated Branches:
refs/heads/trunk f66f61892 -> c6a392324
HADOOP-13871. ITestS3AInputStreamPerformance.testTimeToOpenAndReadWholeFileBlocks performance awful. Contributed by Steve Loughran
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c6a39232
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c6a39232
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c6a39232
Branch: refs/heads/trunk
Commit: c6a39232456fa0c98b2b9b6dbeaec762294ca01e
Parents: f66f618
Author: Mingliang Liu <li...@apache.org>
Authored: Mon Dec 12 14:55:34 2016 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Mon Dec 12 14:55:34 2016 -0800
----------------------------------------------------------------------
.../apache/hadoop/fs/s3a/S3AInputStream.java | 33 ++++++++++---
.../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 3 ++
.../tools/hadoop-aws/troubleshooting_s3a.md | 52 ++++++++++++++++++++
.../scale/ITestS3AInputStreamPerformance.java | 47 ++++++++++++++++--
.../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 20 +++++---
5 files changed, 140 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6a39232/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index dd6cdd7..3c4093d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -132,7 +132,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
throws IOException {
if (wrappedStream != null) {
- closeStream("reopen(" + reason + ")", contentRangeFinish);
+ closeStream("reopen(" + reason + ")", contentRangeFinish, false);
}
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
@@ -257,7 +257,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
// if the code reaches here, the stream needs to be reopened.
// close the stream; if read the object will be opened at the new pos
- closeStream("seekInStream()", this.contentRangeFinish);
+ closeStream("seekInStream()", this.contentRangeFinish, false);
pos = targetPos;
}
@@ -414,7 +414,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
closed = true;
try {
// close or abort the stream
- closeStream("close() operation", this.contentRangeFinish);
+ closeStream("close() operation", this.contentRangeFinish, false);
// this is actually a no-op
super.close();
} finally {
@@ -431,17 +431,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
* an abort.
*
* This does not set the {@link #closed} flag.
- *
* @param reason reason for stream being closed; used in messages
* @param length length of the stream.
+ * @param forceAbort force an abort; used if explicitly requested.
*/
- private void closeStream(String reason, long length) {
+ private void closeStream(String reason, long length, boolean forceAbort) {
if (wrappedStream != null) {
// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
- boolean shouldAbort = remaining > readahead;
+ boolean shouldAbort = forceAbort || remaining > readahead;
if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
@@ -470,6 +470,27 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
}
}
+ /**
+ * Forcibly reset the stream, by aborting the connection. The next
+ * {@code read()} operation will trigger the opening of a new HTTPS
+ * connection.
+ *
+ * This is potentially very inefficient, and should only be invoked
+ * in extreme circumstances. It logs at info for this reason.
+ * @return true if the connection was actually reset.
+ * @throws IOException if invoked on a closed stream.
+ */
+ @InterfaceStability.Unstable
+ public synchronized boolean resetConnection() throws IOException {
+ checkNotClosed();
+ boolean connectionOpen = wrappedStream != null;
+ if (connectionOpen) {
+ LOG.info("Forced reset of connection to {}", uri);
+ closeStream("reset()", contentRangeFinish, true);
+ }
+ return connectionOpen;
+ }
+
@Override
public synchronized int available() throws IOException {
checkNotClosed();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6a39232/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index dedbfd4..aeb8403 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -509,6 +509,7 @@ public final class S3AUtils {
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
+ LOG.debug("Value of {} is {}", key, v);
return v;
}
@@ -529,6 +530,7 @@ public final class S3AUtils {
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
+ LOG.debug("Value of {} is {}", key, v);
return v;
}
@@ -550,6 +552,7 @@ public final class S3AUtils {
Preconditions.checkArgument(v >= min,
String.format("Value of %s: %d is below the minimum value %d",
key, v, min));
+ LOG.debug("Value of {} is {}", key, v);
return v;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6a39232/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
new file mode 100644
index 0000000..d79720e
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -0,0 +1,52 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# Troubleshooting S3A
+
+Here are some lower level details and hints on troubleshooting and tuning
+the S3A client.
+
+## Logging at lower levels
+
+The AWS SDK and the Apache HTTP components can be configured to log at
+more detail, as can S3A itself.
+
+```properties
+log4j.logger.org.apache.hadoop.fs.s3a=DEBUG
+log4j.logger.com.amazonaws.request=DEBUG
+log4j.logger.org.apache.http=DEBUG
+log4j.logger.org.apache.http.wire=ERROR
+```
+
+Be aware that logging HTTP headers may leak sensitive AWS account information,
+so should not be shared.
+
+## Advanced: network performance
+
+An example of this is covered in [HADOOP-13871](https://issues.apache.org/jira/browse/HADOOP-13871).
+
+1. For public data, use `curl`:
+
+ curl -O https://landsat-pds.s3.amazonaws.com/scene_list.gz
+1. Use `nettop` to monitor a processes connections.
+
+Consider reducing the connection timeout of the s3a connection.
+
+```xml
+<property>
+ <name>fs.s3a.connection.timeout</name>
+ <value>15000</value>
+</property>
+```
+This *may* cause the client to react faster to network pauses.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6a39232/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index cc8187e..e36d086 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.junit.After;
+import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@@ -216,12 +217,18 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
long count = 0;
// implicitly rounding down here
long blockCount = len / blockSize;
+ long totalToRead = blockCount * blockSize;
+ long minimumBandwidth = 128 * 1024;
+ int maxResetCount = 4;
+ int resetCount = 0;
for (long i = 0; i < blockCount; i++) {
int offset = 0;
int remaining = blockSize;
+ long blockId = i + 1;
NanoTimer blockTimer = new NanoTimer();
int reads = 0;
while (remaining > 0) {
+ NanoTimer readTimer = new NanoTimer();
int bytesRead = in.read(block, offset, remaining);
reads++;
if (bytesRead == 1) {
@@ -230,14 +237,48 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
remaining -= bytesRead;
offset += bytesRead;
count += bytesRead;
+ readTimer.end();
+ if (bytesRead != 0) {
+ LOG.debug("Bytes in read #{}: {} , block bytes: {}," +
+ " remaining in block: {}" +
+ " duration={} nS; ns/byte: {}, bandwidth={} MB/s",
+ reads, bytesRead, blockSize - remaining, remaining,
+ readTimer.duration(),
+ readTimer.nanosPerOperation(bytesRead),
+ readTimer.bandwidthDescription(bytesRead));
+ } else {
+ LOG.warn("0 bytes returned by read() operation #{}", reads);
+ }
+ }
+ blockTimer.end("Reading block %d in %d reads", blockId, reads);
+ String bw = blockTimer.bandwidthDescription(blockSize);
+ LOG.info("Bandwidth of block {}: {} MB/s: ", blockId, bw);
+ if (bandwidth(blockTimer, blockSize) < minimumBandwidth) {
+ LOG.warn("Bandwidth {} too low on block {}: resetting connection",
+ bw, blockId);
+ Assert.assertTrue("Bandwidth of " + bw +" too low after "
+ + resetCount + " attempts", resetCount <= maxResetCount);
+ resetCount++;
+ // reset the connection
+ getS3AInputStream(in).resetConnection();
}
- blockTimer.end("Reading block %d in %d reads", i, reads);
}
- timer2.end("Time to read %d bytes in %d blocks", len, blockCount);
- bandwidth(timer2, count);
+ timer2.end("Time to read %d bytes in %d blocks", totalToRead, blockCount);
+ LOG.info("Overall Bandwidth {} MB/s; reset connections {}",
+ timer2.bandwidth(totalToRead), resetCount);
logStreamStatistics();
}
+ /**
+ * Work out the bandwidth in bytes/second.
+ * @param timer timer measuring the duration
+ * @param bytes bytes
+ * @return the number of bytes/second of the recorded operation
+ */
+ public static double bandwidth(NanoTimer timer, long bytes) {
+ return bytes * 1.0e9 / timer.duration();
+ }
+
@Test
public void testLazySeekEnabled() throws Throwable {
describe("Verify that seeks do not trigger any IO");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c6a39232/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
index c4174bf..9da621f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.fs.s3a.S3ATestConstants;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
-import org.junit.Assert;
import org.junit.Assume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,14 +162,23 @@ public class S3AScaleTestBase extends AbstractS3ATestBase {
*/
protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics(
FSDataInputStream in) {
+ return getS3AInputStream(in).getS3AStreamStatistics();
+ }
+
+ /**
+ * Get the inner stream of an input stream.
+ * Raises an exception if the inner stream is not an S3A input stream
+ * @param in wrapper
+ * @return the inner stream
+ * @throws AssertionError if the inner stream is of the wrong type
+ */
+ protected S3AInputStream getS3AInputStream(
+ FSDataInputStream in) {
InputStream inner = in.getWrappedStream();
if (inner instanceof S3AInputStream) {
- S3AInputStream s3a = (S3AInputStream) inner;
- return s3a.getS3AStreamStatistics();
+ return (S3AInputStream) inner;
} else {
- Assert.fail("Not an S3AInputStream: " + inner);
- // never reached
- return null;
+ throw new AssertionError("Not an S3AInputStream: " + inner);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org