You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/04/22 16:24:56 UTC
[hadoop] 02/03: HADOOP-16965. Refactor abfs stream configuration.
(#1956)
This is an automated email from the ASF dual-hosted git repository.
stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 98fdbb820ea180a3c7b94a4b09ed96858a803d58
Author: Mukund Thakur <mu...@users.noreply.github.com>
AuthorDate: Tue Apr 21 21:57:29 2020 +0530
HADOOP-16965. Refactor abfs stream configuration. (#1956)
Contributed by Mukund Thakur.
---
.../fs/azurebfs/AzureBlobFileSystemStore.java | 30 +++++++---
.../fs/azurebfs/services/AbfsInputStream.java | 20 +++----
.../azurebfs/services/AbfsInputStreamContext.java | 70 ++++++++++++++++++++++
.../fs/azurebfs/services/AbfsOutputStream.java | 19 +++---
.../azurebfs/services/AbfsOutputStreamContext.java | 68 +++++++++++++++++++++
.../fs/azurebfs/services/AbfsStreamContext.java | 26 ++++++++
6 files changed, 204 insertions(+), 29 deletions(-)
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index a330da4..6b194a4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -81,7 +81,9 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.AuthType;
@@ -415,12 +417,18 @@ public class AzureBlobFileSystemStore implements Closeable {
statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
0,
- abfsConfiguration.getWriteBufferSize(),
- abfsConfiguration.isFlushEnabled(),
- abfsConfiguration.isOutputStreamFlushDisabled());
+ populateAbfsOutputStreamContext());
}
}
+ private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
+ return new AbfsOutputStreamContext()
+ .withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
+ .enableFlush(abfsConfiguration.isFlushEnabled())
+ .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
+ .build();
+ }
+
public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
throws AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
@@ -466,11 +474,19 @@ public class AzureBlobFileSystemStore implements Closeable {
// Add statistics for InputStream
return new AbfsInputStream(client, statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
- abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
- abfsConfiguration.getTolerateOobAppends(), eTag);
+ populateAbfsInputStreamContext(),
+ eTag);
}
}
+ private AbfsInputStreamContext populateAbfsInputStreamContext() {
+ return new AbfsInputStreamContext()
+ .withReadBufferSize(abfsConfiguration.getReadBufferSize())
+ .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
+ .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+ .build();
+ }
+
public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
AzureBlobFileSystemException {
try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
@@ -502,9 +518,7 @@ public class AzureBlobFileSystemStore implements Closeable {
statistics,
AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
offset,
- abfsConfiguration.getWriteBufferSize(),
- abfsConfiguration.isFlushEnabled(),
- abfsConfiguration.isOutputStreamFlushDisabled());
+ populateAbfsOutputStreamContext());
}
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 0c06014..05c093a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -61,21 +61,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
private boolean closed = false;
public AbfsInputStream(
- final AbfsClient client,
- final Statistics statistics,
- final String path,
- final long contentLength,
- final int bufferSize,
- final int readAheadQueueDepth,
- final boolean tolerateOobAppends,
- final String eTag) {
+ final AbfsClient client,
+ final Statistics statistics,
+ final String path,
+ final long contentLength,
+ final AbfsInputStreamContext abfsInputStreamContext,
+ final String eTag) {
this.client = client;
this.statistics = statistics;
this.path = path;
this.contentLength = contentLength;
- this.bufferSize = bufferSize;
- this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
- this.tolerateOobAppends = tolerateOobAppends;
+ this.bufferSize = abfsInputStreamContext.getReadBufferSize();
+ this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
+ this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
this.eTag = eTag;
this.readAheadEnabled = true;
}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
new file mode 100644
index 0000000..cba7191
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Class to hold extra input stream configs.
+ */
+public class AbfsInputStreamContext extends AbfsStreamContext {
+
+ private int readBufferSize;
+
+ private int readAheadQueueDepth;
+
+ private boolean tolerateOobAppends;
+
+ public AbfsInputStreamContext() {
+ }
+
+ public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
+ this.readBufferSize = readBufferSize;
+ return this;
+ }
+
+ public AbfsInputStreamContext withReadAheadQueueDepth(
+ final int readAheadQueueDepth) {
+ this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
+ ? readAheadQueueDepth
+ : Runtime.getRuntime().availableProcessors();
+ return this;
+ }
+
+ public AbfsInputStreamContext withTolerateOobAppends(
+ final boolean tolerateOobAppends) {
+ this.tolerateOobAppends = tolerateOobAppends;
+ return this;
+ }
+
+ public AbfsInputStreamContext build() {
+ // Validation of parameters to be done here.
+ return this;
+ }
+
+ public int getReadBufferSize() {
+ return readBufferSize;
+ }
+
+ public int getReadAheadQueueDepth() {
+ return readAheadQueueDepth;
+ }
+
+ public boolean isTolerateOobAppends() {
+ return tolerateOobAppends;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index e943169..f29cc4a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -82,23 +82,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
private final Statistics statistics;
public AbfsOutputStream(
- final AbfsClient client,
- final Statistics statistics,
- final String path,
- final long position,
- final int bufferSize,
- final boolean supportFlush,
- final boolean disableOutputStreamFlush) {
+ final AbfsClient client,
+ final Statistics statistics,
+ final String path,
+ final long position,
+ AbfsOutputStreamContext abfsOutputStreamContext) {
this.client = client;
this.statistics = statistics;
this.path = path;
this.position = position;
this.closed = false;
- this.supportFlush = supportFlush;
- this.disableOutputStreamFlush = disableOutputStreamFlush;
+ this.supportFlush = abfsOutputStreamContext.isEnableFlush();
+ this.disableOutputStreamFlush = abfsOutputStreamContext
+ .isDisableOutputStreamFlush();
this.lastError = null;
this.lastFlushOffset = 0;
- this.bufferSize = bufferSize;
+ this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
this.bufferIndex = 0;
this.writeOperations = new ConcurrentLinkedDeque<>();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
new file mode 100644
index 0000000..0be97c5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Class to hold extra output stream configs.
+ */
+public class AbfsOutputStreamContext extends AbfsStreamContext {
+
+ private int writeBufferSize;
+
+ private boolean enableFlush;
+
+ private boolean disableOutputStreamFlush;
+
+ public AbfsOutputStreamContext() {
+ }
+
+ public AbfsOutputStreamContext withWriteBufferSize(
+ final int writeBufferSize) {
+ this.writeBufferSize = writeBufferSize;
+ return this;
+ }
+
+ public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
+ this.enableFlush = enableFlush;
+ return this;
+ }
+
+ public AbfsOutputStreamContext disableOutputStreamFlush(
+ final boolean disableOutputStreamFlush) {
+ this.disableOutputStreamFlush = disableOutputStreamFlush;
+ return this;
+ }
+
+ public AbfsOutputStreamContext build() {
+ // Validation of parameters to be done here.
+ return this;
+ }
+
+ public int getWriteBufferSize() {
+ return writeBufferSize;
+ }
+
+ public boolean isEnableFlush() {
+ return enableFlush;
+ }
+
+ public boolean isDisableOutputStreamFlush() {
+ return disableOutputStreamFlush;
+ }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java
new file mode 100644
index 0000000..ee77f59
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Base stream configuration class which is going
+ * to store common configs among input and output streams.
+ */
+public abstract class AbfsStreamContext {
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org