You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2017/12/26 22:44:36 UTC
[40/50] [abbrv] hadoop git commit: HADOOP-14965. S3a input stream
"normal" fadvise mode to be adaptive
HADOOP-14965. S3a input stream "normal" fadvise mode to be adaptive
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1ba491ff
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1ba491ff
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1ba491ff
Branch: refs/heads/yarn-3409
Commit: 1ba491ff907fc5d2618add980734a3534e2be098
Parents: 13ad747
Author: Steve Loughran <st...@apache.org>
Authored: Wed Dec 20 18:25:33 2017 +0000
Committer: Steve Loughran <st...@apache.org>
Committed: Wed Dec 20 18:25:33 2017 +0000
----------------------------------------------------------------------
.../apache/hadoop/fs/s3a/S3AInputStream.java | 28 +++++++++++++++++---
.../hadoop/fs/s3a/S3AInstrumentation.java | 13 +++++++++
.../src/site/markdown/tools/hadoop-aws/index.md | 13 ++++++++-
.../scale/ITestS3AInputStreamPerformance.java | 6 ++++-
4 files changed, 54 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba491ff/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 7e6d640..0074143 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -83,7 +83,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
private final S3AInstrumentation.InputStreamStatistics streamStatistics;
private S3AEncryptionMethods serverSideEncryptionAlgorithm;
private String serverSideEncryptionKey;
- private final S3AInputPolicy inputPolicy;
+ private S3AInputPolicy inputPolicy;
private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
private final Invoker invoker;
@@ -139,12 +139,22 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
this.serverSideEncryptionAlgorithm =
s3Attributes.getServerSideEncryptionAlgorithm();
this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
- this.inputPolicy = inputPolicy;
+ setInputPolicy(inputPolicy);
setReadahead(readahead);
this.invoker = invoker;
}
/**
+ * Set/update the input policy of the stream.
+ * This updates the stream statistics.
+ * @param inputPolicy new input policy.
+ */
+ private void setInputPolicy(S3AInputPolicy inputPolicy) {
+ this.inputPolicy = inputPolicy;
+ streamStatistics.inputPolicySet(inputPolicy.ordinal());
+ }
+
+ /**
* Opens up the stream at specified target position and for given length.
*
* @param reason reason for reopen
@@ -162,8 +172,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
length, contentLength, readahead);
LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
- " streamPosition={}, nextReadPosition={}",
- uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos);
+ " streamPosition={}, nextReadPosition={}, policy={}",
+ uri, reason, targetPos, contentRangeFinish, length, pos, nextReadPos,
+ inputPolicy);
long opencount = streamStatistics.streamOpened();
GetObjectRequest request = new GetObjectRequest(bucket, key)
@@ -274,6 +285,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
} else if (diff < 0) {
// backwards seek
streamStatistics.seekBackwards(diff);
+ // if the stream is in "Normal" mode, switch to random IO at this
+ // point, as it is indicative of columnar format IO
+ if (inputPolicy.equals(S3AInputPolicy.Normal)) {
+ LOG.info("Switching to Random IO seek policy");
+ setInputPolicy(S3AInputPolicy.Random);
+ }
} else {
// targetPos == pos
if (remainingInCurrentRequest() > 0) {
@@ -443,6 +460,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
try {
// close or abort the stream
closeStream("close() operation", this.contentRangeFinish, false);
+ LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
// this is actually a no-op
super.close();
} finally {
@@ -713,6 +731,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
break;
case Normal:
+ // normal is considered sequential until a backwards seek switches
+ // it to 'Random'
default:
rangeLimit = contentLength;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba491ff/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 0fbcc00..d843347 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -667,6 +667,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
public long readsIncomplete;
public long bytesReadInClose;
public long bytesDiscardedInAbort;
+ public long policySetCount;
+ public long inputPolicy;
private InputStreamStatistics() {
}
@@ -783,6 +785,15 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
}
/**
+ * The input policy has been switched.
+ * @param updatedPolicy enum value of new policy.
+ */
+ public void inputPolicySet(int updatedPolicy) {
+ policySetCount++;
+ inputPolicy = updatedPolicy;
+ }
+
+ /**
* String operator describes all the current statistics.
* <b>Important: there are no guarantees as to the stability
* of this value.</b>
@@ -813,6 +824,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
sb.append(", ReadsIncomplete=").append(readsIncomplete);
sb.append(", BytesReadInClose=").append(bytesReadInClose);
sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
+ sb.append(", InputPolicy=").append(inputPolicy);
+ sb.append(", InputPolicySetCount=").append(policySetCount);
sb.append('}');
return sb.toString();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba491ff/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index fbcd54a..7eebf5c 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -1553,7 +1553,18 @@ backward seeks.
*"normal" (default)*
-This is currently the same as "sequential", though it may evolve in future.
+The "Normal" policy starts off reading a file in "sequential" mode,
+but if the caller seeks backwards in the stream, it switches from
+sequential to "random".
+
+This policy effectively recognizes the initial read pattern of columnar
+storage formats (e.g. Apache ORC and Apache Parquet), which seek to the end
+of a file, read in index data and then seek backwards to selectively read
+columns. The first seeks may be be expensive compared to the random policy,
+however the overall process is much less expensive than either sequentially
+reading through a file with the "random" policy, or reading columnar data
+with the "sequential" policy. When the exact format/recommended
+seek policy of data are known in advance, this policy
*"random"*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1ba491ff/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
index 83ab210..efd96c4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java
@@ -427,7 +427,11 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
assertEquals("streams aborted in " + streamStatistics,
- 4, streamStatistics.aborted);
+ 1, streamStatistics.aborted);
+ assertEquals("policy changes in " + streamStatistics,
+ 2, streamStatistics.policySetCount);
+ assertEquals("input policy in " + streamStatistics,
+ S3AInputPolicy.Random.ordinal(), streamStatistics.inputPolicy);
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org