You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2020/04/22 16:24:56 UTC

[hadoop] 02/03: HADOOP-16965. Refactor abfs stream configuration. (#1956)

This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 98fdbb820ea180a3c7b94a4b09ed96858a803d58
Author: Mukund Thakur <mu...@users.noreply.github.com>
AuthorDate: Tue Apr 21 21:57:29 2020 +0530

    HADOOP-16965. Refactor abfs stream configuration. (#1956)
    
    
    Contributed by Mukund Thakur.
---
 .../fs/azurebfs/AzureBlobFileSystemStore.java      | 30 +++++++---
 .../fs/azurebfs/services/AbfsInputStream.java      | 20 +++----
 .../azurebfs/services/AbfsInputStreamContext.java  | 70 ++++++++++++++++++++++
 .../fs/azurebfs/services/AbfsOutputStream.java     | 19 +++---
 .../azurebfs/services/AbfsOutputStreamContext.java | 68 +++++++++++++++++++++
 .../fs/azurebfs/services/AbfsStreamContext.java    | 26 ++++++++
 6 files changed, 204 insertions(+), 29 deletions(-)

diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index a330da4..6b194a4 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -81,7 +81,9 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsAclHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
+import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
 import org.apache.hadoop.fs.azurebfs.services.AbfsPermission;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
@@ -415,12 +417,18 @@ public class AzureBlobFileSystemStore implements Closeable {
           statistics,
           AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
           0,
-          abfsConfiguration.getWriteBufferSize(),
-          abfsConfiguration.isFlushEnabled(),
-          abfsConfiguration.isOutputStreamFlushDisabled());
+          populateAbfsOutputStreamContext());
     }
   }
 
+  private AbfsOutputStreamContext populateAbfsOutputStreamContext() {
+    return new AbfsOutputStreamContext()
+            .withWriteBufferSize(abfsConfiguration.getWriteBufferSize())
+            .enableFlush(abfsConfiguration.isFlushEnabled())
+            .disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
+            .build();
+  }
+
   public void createDirectory(final Path path, final FsPermission permission, final FsPermission umask)
       throws AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("createDirectory", "createPath")) {
@@ -466,11 +474,19 @@ public class AzureBlobFileSystemStore implements Closeable {
       // Add statistics for InputStream
       return new AbfsInputStream(client, statistics,
               AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path), contentLength,
-              abfsConfiguration.getReadBufferSize(), abfsConfiguration.getReadAheadQueueDepth(),
-              abfsConfiguration.getTolerateOobAppends(), eTag);
+              populateAbfsInputStreamContext(),
+              eTag);
     }
   }
 
+  private AbfsInputStreamContext populateAbfsInputStreamContext() {
+    return new AbfsInputStreamContext()
+            .withReadBufferSize(abfsConfiguration.getReadBufferSize())
+            .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
+            .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+            .build();
+  }
+
   public OutputStream openFileForWrite(final Path path, final FileSystem.Statistics statistics, final boolean overwrite) throws
           AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("openFileForWrite", "getPathStatus")) {
@@ -502,9 +518,7 @@ public class AzureBlobFileSystemStore implements Closeable {
           statistics,
           AbfsHttpConstants.FORWARD_SLASH + getRelativePath(path),
           offset,
-          abfsConfiguration.getWriteBufferSize(),
-          abfsConfiguration.isFlushEnabled(),
-          abfsConfiguration.isOutputStreamFlushDisabled());
+          populateAbfsOutputStreamContext());
     }
   }
 
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 0c06014..05c093a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -61,21 +61,19 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   private boolean closed = false;
 
   public AbfsInputStream(
-      final AbfsClient client,
-      final Statistics statistics,
-      final String path,
-      final long contentLength,
-      final int bufferSize,
-      final int readAheadQueueDepth,
-      final boolean tolerateOobAppends,
-      final String eTag) {
+          final AbfsClient client,
+          final Statistics statistics,
+          final String path,
+          final long contentLength,
+          final AbfsInputStreamContext abfsInputStreamContext,
+          final String eTag) {
     this.client = client;
     this.statistics = statistics;
     this.path = path;
     this.contentLength = contentLength;
-    this.bufferSize = bufferSize;
-    this.readAheadQueueDepth = (readAheadQueueDepth >= 0) ? readAheadQueueDepth : Runtime.getRuntime().availableProcessors();
-    this.tolerateOobAppends = tolerateOobAppends;
+    this.bufferSize = abfsInputStreamContext.getReadBufferSize();
+    this.readAheadQueueDepth = abfsInputStreamContext.getReadAheadQueueDepth();
+    this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
     this.readAheadEnabled = true;
   }
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
new file mode 100644
index 0000000..cba7191
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Class to hold extra input stream configs.
+ */
+public class AbfsInputStreamContext extends AbfsStreamContext {
+
+  private int readBufferSize;
+
+  private int readAheadQueueDepth;
+
+  private boolean tolerateOobAppends;
+
+  public AbfsInputStreamContext() {
+  }
+
+  public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
+    this.readBufferSize = readBufferSize;
+    return this;
+  }
+
+  public AbfsInputStreamContext withReadAheadQueueDepth(
+          final int readAheadQueueDepth) {
+    this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
+            ? readAheadQueueDepth
+            : Runtime.getRuntime().availableProcessors();
+    return this;
+  }
+
+  public AbfsInputStreamContext withTolerateOobAppends(
+          final boolean tolerateOobAppends) {
+    this.tolerateOobAppends = tolerateOobAppends;
+    return this;
+  }
+
+  public AbfsInputStreamContext build() {
+    // Validation of parameters to be done here.
+    return this;
+  }
+
+  public int getReadBufferSize() {
+    return readBufferSize;
+  }
+
+  public int getReadAheadQueueDepth() {
+    return readAheadQueueDepth;
+  }
+
+  public boolean isTolerateOobAppends() {
+    return tolerateOobAppends;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
index e943169..f29cc4a 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStream.java
@@ -82,23 +82,22 @@ public class AbfsOutputStream extends OutputStream implements Syncable, StreamCa
   private final Statistics statistics;
 
   public AbfsOutputStream(
-      final AbfsClient client,
-      final Statistics statistics,
-      final String path,
-      final long position,
-      final int bufferSize,
-      final boolean supportFlush,
-      final boolean disableOutputStreamFlush) {
+          final AbfsClient client,
+          final Statistics statistics,
+          final String path,
+          final long position,
+          AbfsOutputStreamContext abfsOutputStreamContext) {
     this.client = client;
     this.statistics = statistics;
     this.path = path;
     this.position = position;
     this.closed = false;
-    this.supportFlush = supportFlush;
-    this.disableOutputStreamFlush = disableOutputStreamFlush;
+    this.supportFlush = abfsOutputStreamContext.isEnableFlush();
+    this.disableOutputStreamFlush = abfsOutputStreamContext
+            .isDisableOutputStreamFlush();
     this.lastError = null;
     this.lastFlushOffset = 0;
-    this.bufferSize = bufferSize;
+    this.bufferSize = abfsOutputStreamContext.getWriteBufferSize();
     this.buffer = byteBufferPool.getBuffer(false, bufferSize).array();
     this.bufferIndex = 0;
     this.writeOperations = new ConcurrentLinkedDeque<>();
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
new file mode 100644
index 0000000..0be97c5
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsOutputStreamContext.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Class to hold extra output stream configs.
+ */
+public class AbfsOutputStreamContext extends AbfsStreamContext {
+
+  private int writeBufferSize;
+
+  private boolean enableFlush;
+
+  private boolean disableOutputStreamFlush;
+
+  public AbfsOutputStreamContext() {
+  }
+
+  public AbfsOutputStreamContext withWriteBufferSize(
+          final int writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+    return this;
+  }
+
+  public AbfsOutputStreamContext enableFlush(final boolean enableFlush) {
+    this.enableFlush = enableFlush;
+    return this;
+  }
+
+  public AbfsOutputStreamContext disableOutputStreamFlush(
+          final boolean disableOutputStreamFlush) {
+    this.disableOutputStreamFlush = disableOutputStreamFlush;
+    return this;
+  }
+
+  public AbfsOutputStreamContext build() {
+    // Validation of parameters to be done here.
+    return this;
+  }
+
+  public int getWriteBufferSize() {
+    return writeBufferSize;
+  }
+
+  public boolean isEnableFlush() {
+    return enableFlush;
+  }
+
+  public boolean isDisableOutputStreamFlush() {
+    return disableOutputStreamFlush;
+  }
+}
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java
new file mode 100644
index 0000000..ee77f59
--- /dev/null
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsStreamContext.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+/**
+ * Base stream configuration class which is going
+ * to store common configs among input and output streams.
+ */
+public abstract class AbfsStreamContext {
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org