You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xi...@apache.org on 2017/12/08 05:06:29 UTC

[2/2] hadoop git commit: HADOOP-15012. Add readahead, dropbehind, and unbuffer to StreamCapabilities. Contributed by John Zhuge.

HADOOP-15012. Add readahead, dropbehind, and unbuffer to StreamCapabilities. Contributed by John Zhuge.

(cherry picked from commit bf6a660232b01642b07697a289c773ea5b97217c)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9942952d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9942952d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9942952d

Branch: refs/heads/branch-3.0
Commit: 9942952dd87852f1dc142a78d81f8e1a0920bf10
Parents: 63d81cb
Author: John Zhuge <jz...@apache.org>
Authored: Mon Nov 6 23:54:27 2017 -0800
Committer: Xiao Chen <xi...@apache.org>
Committed: Thu Dec 7 21:06:25 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/FSDataInputStream.java | 15 +++---
 .../apache/hadoop/fs/StreamCapabilities.java    | 48 +++++++++++++-----
 .../hadoop/fs/StreamCapabilitiesPolicy.java     | 51 ++++++++++++++++++++
 .../src/site/markdown/filesystem/filesystem.md  | 21 +++++---
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 16 +++++-
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 12 ++---
 .../hadoop/fs/azure/BlockBlobAppendStream.java  | 17 ++++---
 7 files changed, 140 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9942952d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index a80279d..08d71f1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.util.IdentityHashStore;
 public class FSDataInputStream extends DataInputStream
     implements Seekable, PositionedReadable, 
       ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
-      HasEnhancedByteBufferAccess, CanUnbuffer {
+      HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
   /**
    * Map ByteBuffers that we have handed out to readers to ByteBufferPool 
    * objects
@@ -227,12 +227,15 @@ public class FSDataInputStream extends DataInputStream
 
   @Override
   public void unbuffer() {
-    try {
-      ((CanUnbuffer)in).unbuffer();
-    } catch (ClassCastException e) {
-      throw new UnsupportedOperationException("this stream " +
-          in.getClass().getName() + " does not " + "support unbuffering.");
+    StreamCapabilitiesPolicy.unbuffer(in);
+  }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    if (in instanceof StreamCapabilities) {
+      return ((StreamCapabilities) in).hasCapability(capability);
     }
+    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9942952d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index 65aa679..3549cdc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -23,27 +23,49 @@ import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * Interface to query streams for supported capabilities.
+ *
+ * Capability strings must be in lower case.
+ *
+ * Constant strings are chosen over enums in order to allow other file systems
+ * to define their own capabilities.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface StreamCapabilities {
   /**
+   * Stream hflush capability implemented by {@link Syncable#hflush()}.
+   */
+  String HFLUSH = "hflush";
+
+  /**
+   * Stream hsync capability implemented by {@link Syncable#hsync()}.
+   */
+  String HSYNC = "hsync";
+
+  /**
+   * Stream setReadahead capability implemented by
+   * {@link CanSetReadahead#setReadahead(Long)}.
+   */
+  String READAHEAD = "in:readahead";
+
+  /**
+   * Stream setDropBehind capability implemented by
+   * {@link CanSetDropBehind#setDropBehind(Boolean)}.
+   */
+  String DROPBEHIND = "dropbehind";
+
+  /**
+   * Stream unbuffer capability implemented by {@link CanUnbuffer#unbuffer()}.
+   */
+  String UNBUFFER = "in:unbuffer";
+
+  /**
    * Capabilities that a stream can support and be queried for.
    */
+  @Deprecated
   enum StreamCapability {
-    /**
-     * Stream hflush capability to flush out the data in client's buffer.
-     * Streams with this capability implement {@link Syncable} and support
-     * {@link Syncable#hflush()}.
-     */
-    HFLUSH("hflush"),
-
-    /**
-     * Stream hsync capability to flush out the data in client's buffer and
-     * the disk device. Streams with this capability implement {@link Syncable}
-     * and support {@link Syncable#hsync()}.
-     */
-    HSYNC("hsync");
+    HFLUSH(StreamCapabilities.HFLUSH),
+    HSYNC(StreamCapabilities.HSYNC);
 
     private final String capability;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9942952d/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java
new file mode 100644
index 0000000..3080780
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.InputStream;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Static methods to implement policies for {@link StreamCapabilities}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class StreamCapabilitiesPolicy {
+  /**
+   * Implement the policy for {@link CanUnbuffer#unbuffer()}.
+   *
+   * @param in the input stream
+   */
+  public static void unbuffer(InputStream in) {
+    try {
+      if (in instanceof StreamCapabilities
+          && ((StreamCapabilities) in).hasCapability(
+          StreamCapabilities.UNBUFFER)) {
+        ((CanUnbuffer) in).unbuffer();
+      }
+    } catch (ClassCastException e) {
+      throw new UnsupportedOperationException("this stream " +
+          in.getClass().getName() +
+          " claims to unbuffer but forgets to implement CanUnbuffer");
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9942952d/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
index e67cbe3..91da02d 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md
@@ -1244,7 +1244,8 @@ problems were not considered during the implementation of these loops.
 ## <a name="StreamCapability"></a> interface `StreamCapabilities`
 
 The `StreamCapabilities` provides a way to programmatically query the
-capabilities that an `OutputStream` supports.
+capabilities that `OutputStream`, `InputStream`, or other FileSystem class
+supports.
 
 ```java
 public interface StreamCapabilities {
@@ -1254,12 +1255,16 @@ public interface StreamCapabilities {
 
 ### `boolean hasCapability(capability)`
 
-Return true if the `OutputStream` has the desired capability.
+Return true if the `OutputStream`, `InputStream`, or other FileSystem class
+has the desired capability.
 
 The caller can query the capabilities of a stream using a string value.
-It currently supports to query:
-
- * `StreamCapabilties.HFLUSH` ("*hflush*"): the capability to flush out the data
- in client's buffer.
- * `StreamCapabilities.HSYNC` ("*hsync*"): capability to flush out the data in
- client's buffer and the disk device.
+Here is a table of possible string values:
+
+String       | Constant   | Implements       | Description
+-------------|------------|------------------|-------------------------------
+hflush       | HFLUSH     | Syncable         | Flush out the data in client's user buffer. After the return of this call, new readers will see the data.
+hsync        | HSYNC      | Syncable         | Flush out the data in client's user buffer all the way to the disk device (but the disk may have it in its cache). Similar to POSIX fsync.
+in:readahead | READAHEAD  | CanSetReadahead  | Set the readahead on the input stream.
+dropbehind   | DROPBEHIND | CanSetDropBehind | Drop the cache.
+in:unbuffer  | UNBUFFER   | CanUnbuffer      | Reduce the buffering on the input stream.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9942952d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index e3d7ade..d3d6669 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
 import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
@@ -81,6 +82,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.IdentityHashStore;
 import org.apache.hadoop.util.StopWatch;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.htrace.core.SpanId;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.Tracer;
@@ -96,7 +98,7 @@ import javax.annotation.Nonnull;
 @InterfaceAudience.Private
 public class DFSInputStream extends FSInputStream
     implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
-    HasEnhancedByteBufferAccess, CanUnbuffer {
+               HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
   @VisibleForTesting
   public static boolean tcpReadsDisabledForTesting = false;
   private long hedgedReadOpsLoopNumForTesting = 0;
@@ -1779,4 +1781,16 @@ public class DFSInputStream extends FSInputStream
   public synchronized void unbuffer() {
     closeCurrentBlockReaders();
   }
+
+  @Override
+  public boolean hasCapability(String capability) {
+    switch (StringUtils.toLowerCase(capability)) {
+    case StreamCapabilities.READAHEAD:
+    case StreamCapabilities.DROPBEHIND:
+    case StreamCapabilities.UNBUFFER:
+      return true;
+    default:
+      return false;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9942952d/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 83f1425..7849796 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -17,9 +17,6 @@
  */
 package org.apache.hadoop.hdfs;
 
-import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH;
-import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC;
-
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -69,6 +66,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DataChecksum.Type;
 import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.htrace.core.TraceScope;
 import org.slf4j.Logger;
@@ -552,11 +550,13 @@ public class DFSOutputStream extends FSOutputSummer
 
   @Override
   public boolean hasCapability(String capability) {
-    if (capability.equalsIgnoreCase(HSYNC.getValue()) ||
-        capability.equalsIgnoreCase((HFLUSH.getValue()))) {
+    switch (StringUtils.toLowerCase(capability)) {
+    case StreamCapabilities.HSYNC:
+    case StreamCapabilities.HFLUSH:
       return true;
+    default:
+      return false;
     }
-    return false;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9942952d/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
index 84342cd..9a85308 100644
--- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
+++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java
@@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Locale;
 import java.util.UUID;
 import java.util.Random;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -63,9 +64,6 @@ import com.microsoft.azure.storage.blob.BlockEntry;
 import com.microsoft.azure.storage.blob.BlockListingFilter;
 import com.microsoft.azure.storage.blob.BlockSearchMode;
 
-import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH;
-import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC;
-
 /**
  * Stream object that implements append for Block Blobs in WASB.
  *
@@ -550,9 +548,16 @@ public class BlockBlobAppendStream extends OutputStream implements Syncable,
    */
   @Override
   public boolean hasCapability(String capability) {
-    return compactionEnabled
-        && (capability.equalsIgnoreCase(HSYNC.getValue())
-        || capability.equalsIgnoreCase((HFLUSH.getValue())));
+    if (!compactionEnabled) {
+      return false;
+    }
+    switch (capability.toLowerCase(Locale.ENGLISH)) {
+    case StreamCapabilities.HSYNC:
+    case StreamCapabilities.HFLUSH:
+      return true;
+    default:
+      return false;
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org