You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/06/26 12:50:10 UTC

[36/50] hadoop git commit: HDFS-11789. Maintain Short-Circuit Read Statistics. Contributed by Hanisha Koneru.

HDFS-11789. Maintain Short-Circuit Read Statistics. Contributed by Hanisha Koneru.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6d116ffa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6d116ffa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6d116ffa

Branch: refs/heads/HADOOP-13345
Commit: 6d116ffad23b470f8e9ca131d8e89cbbbb4378d7
Parents: 49aa60e
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu Jun 22 13:35:56 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 22 13:35:56 2017 -0700

----------------------------------------------------------------------
 .../hdfs/client/HdfsClientConfigKeys.java       |   4 +
 .../hdfs/client/impl/BlockReaderLocal.java      |  52 ++++-
 .../hadoop/hdfs/client/impl/DfsClientConf.java  |  26 +++
 .../impl/metrics/BlockReaderIoProvider.java     |  89 ++++++++
 .../impl/metrics/BlockReaderLocalMetrics.java   |  78 +++++++
 .../hdfs/client/impl/metrics/package-info.java  |  27 +++
 .../client/impl/TestBlockReaderIoProvider.java  |  75 ++++++
 .../impl/TestBlockReaderLocalMetrics.java       | 227 +++++++++++++++++++
 8 files changed, 566 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index fbc8d89..5667989 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -343,6 +343,10 @@ public interface HdfsClientConfigKeys {
       int     STREAMS_CACHE_SIZE_DEFAULT = 256;
       String  STREAMS_CACHE_EXPIRY_MS_KEY = PREFIX + "streams.cache.expiry.ms";
       long    STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5*MINUTE;
+
+      String  METRICS_SAMPLING_PERCENTAGE_KEY =
+          PREFIX + "metrics.sampling.percentage";
+      int     METRICS_SAMPLING_PERCENTAGE_DEFAULT = 0;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
index 1b38996..df0f65f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
@@ -17,17 +17,16 @@
  */
 package org.apache.hadoop.hdfs.client.impl;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.EnumSet;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.BlockReader;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
 import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -35,15 +34,19 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
 import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.hadoop.util.Timer;
 import org.apache.htrace.core.TraceScope;
 import org.apache.htrace.core.Tracer;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.EnumSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
 /**
  * BlockReaderLocal enables local short circuited reads. If the DFS client is on
  * the same machine as the datanode, then the client can read files directly
@@ -66,6 +69,11 @@ class BlockReaderLocal implements BlockReader {
 
   private static final DirectBufferPool bufferPool = new DirectBufferPool();
 
+  private static BlockReaderLocalMetrics metrics;
+  private static Lock metricsInitializationLock = new ReentrantLock();
+  private final BlockReaderIoProvider blockReaderIoProvider;
+  private static final Timer TIMER = new Timer();
+
   public static class Builder {
     private final int bufferSize;
     private boolean verifyChecksum;
@@ -76,8 +84,10 @@ class BlockReaderLocal implements BlockReader {
     private ExtendedBlock block;
     private StorageType storageType;
     private Tracer tracer;
+    private ShortCircuitConf shortCircuitConf;
 
     public Builder(ShortCircuitConf conf) {
+      this.shortCircuitConf = conf;
       this.maxReadahead = Integer.MAX_VALUE;
       this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
       this.bufferSize = conf.getShortCircuitBufferSize();
@@ -269,6 +279,20 @@ class BlockReaderLocal implements BlockReader {
     this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
     this.storageType = builder.storageType;
     this.tracer = builder.tracer;
+
+    if (builder.shortCircuitConf.isScrMetricsEnabled()) {
+      metricsInitializationLock.lock();
+      try {
+        if (metrics == null) {
+          metrics = BlockReaderLocalMetrics.create();
+        }
+      } finally {
+        metricsInitializationLock.unlock();
+      }
+    }
+
+    this.blockReaderIoProvider = new BlockReaderIoProvider(
+        builder.shortCircuitConf, metrics, TIMER);
   }
 
   private synchronized void createDataBufIfNeeded() {
@@ -342,7 +366,7 @@ class BlockReaderLocal implements BlockReader {
       long startDataPos = dataPos;
       int startBufPos = buf.position();
       while (buf.hasRemaining()) {
-        int nRead = dataIn.read(buf, dataPos);
+        int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
         if (nRead < 0) {
           break;
         }
@@ -435,7 +459,7 @@ class BlockReaderLocal implements BlockReader {
     freeChecksumBufIfExists();
     int total = 0;
     while (buf.hasRemaining()) {
-      int nRead = dataIn.read(buf, dataPos);
+      int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
       if (nRead <= 0) break;
       dataPos += nRead;
       total += nRead;
@@ -574,7 +598,8 @@ class BlockReaderLocal implements BlockReader {
         int len) throws IOException {
     freeDataBufIfExists();
     freeChecksumBufIfExists();
-    int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
+    int nRead = blockReaderIoProvider.read(
+        dataIn, ByteBuffer.wrap(arr, off, len), dataPos);
     if (nRead > 0) {
       dataPos += nRead;
     } else if ((nRead == 0) && (dataPos == dataIn.size())) {
@@ -627,6 +652,9 @@ class BlockReaderLocal implements BlockReader {
     replica.unref();
     freeDataBufIfExists();
     freeChecksumBufIfExists();
+    if (metrics != null) {
+      metrics.collectThreadLocalStates();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index b2fd487..332abb5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -593,6 +593,10 @@ public class DfsClientConf {
     private final long shortCircuitStreamsCacheExpiryMs;
     private final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
 
+    // Short Circuit Read Metrics
+    private final boolean scrMetricsEnabled;
+    private final int scrMetricsSamplingPercentage;
+
     private final boolean shortCircuitMmapEnabled;
     private final int shortCircuitMmapCacheSize;
     private final long shortCircuitMmapCacheExpiryMs;
@@ -615,6 +619,20 @@ public class DfsClientConf {
       shortCircuitLocalReads = conf.getBoolean(
           Read.ShortCircuit.KEY,
           Read.ShortCircuit.DEFAULT);
+      int scrSamplingPercentage = conf.getInt(
+          Read.ShortCircuit.METRICS_SAMPLING_PERCENTAGE_KEY,
+          Read.ShortCircuit.METRICS_SAMPLING_PERCENTAGE_DEFAULT);
+      if (scrSamplingPercentage <= 0) {
+        scrMetricsSamplingPercentage = 0;
+        scrMetricsEnabled = false;
+      } else if (scrSamplingPercentage > 100) {
+        scrMetricsSamplingPercentage = 100;
+        scrMetricsEnabled = true;
+      } else {
+        scrMetricsSamplingPercentage = scrSamplingPercentage;
+        scrMetricsEnabled = true;
+      }
+
       domainSocketDataTraffic = conf.getBoolean(
           DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
           DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
@@ -693,6 +711,14 @@ public class DfsClientConf {
       return shortCircuitLocalReads;
     }
 
+    public boolean isScrMetricsEnabled() {
+      return scrMetricsEnabled;
+    }
+
+    public int getScrMetricsSamplingPercentage() {
+      return scrMetricsSamplingPercentage;
+    }
+
     public boolean isDomainSocketDataTraffic() {
       return domainSocketDataTraffic;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java
new file mode 100644
index 0000000..0792db8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl.metrics;
+
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Profiles {@link org.apache.hadoop.hdfs.client.impl.BlockReaderLocal} short
+ * circuit read latencies when ShortCircuit read metrics is enabled through
+ * {@link ShortCircuitConf#scrMetricsEnabled}.
+ */
+public class BlockReaderIoProvider {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      BlockReaderIoProvider.class);
+
+  private final BlockReaderLocalMetrics metrics;
+  private final boolean isEnabled;
+  private final int sampleRangeMax;
+  private final Timer timer;
+
+  // Threshold in milliseconds above which a warning should be flagged.
+  private static final long SLOW_READ_WARNING_THRESHOLD_MS = 1000;
+  private boolean isWarningLogged = false;
+
+  public BlockReaderIoProvider(@Nullable ShortCircuitConf conf,
+      BlockReaderLocalMetrics metrics, Timer timer) {
+    if (conf != null) {
+      isEnabled = conf.isScrMetricsEnabled();
+      sampleRangeMax = (Integer.MAX_VALUE / 100) *
+          conf.getScrMetricsSamplingPercentage();
+      this.metrics = metrics;
+      this.timer = timer;
+    } else {
+      this.isEnabled = false;
+      this.sampleRangeMax = 0;
+      this.metrics = null;
+      this.timer = null;
+    }
+  }
+
+  public int read(FileChannel dataIn, ByteBuffer dst, long position)
+      throws IOException{
+    final int nRead;
+    if (isEnabled && (ThreadLocalRandom.current().nextInt() < sampleRangeMax)) {
+      long begin = timer.monotonicNow();
+      nRead = dataIn.read(dst, position);
+      long latency = timer.monotonicNow() - begin;
+      addLatency(latency);
+    } else {
+      nRead = dataIn.read(dst, position);
+    }
+    return nRead;
+  }
+
+  private void addLatency(long latency) {
+    metrics.addShortCircuitReadLatency(latency);
+    if (latency > SLOW_READ_WARNING_THRESHOLD_MS && !isWarningLogged) {
+      LOG.warn(String.format("The Short Circuit Local Read latency, %d ms, " +
+          "is higher then the threshold (%d ms). Suppressing further warnings" +
+          " for this BlockReaderLocal.",
+          latency, SLOW_READ_WARNING_THRESHOLD_MS));
+      isWarningLogged = true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java
new file mode 100644
index 0000000..61b497e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
+
+/**
+ * This class maintains a metric of rolling average latency for short circuit
+ * reads.
+ */
+@InterfaceAudience.Private
+@Metrics(name="HdfsShortCircuitReads",
+         about="Block Reader Local's Short Circuit Read latency",
+         context="dfs")
+public class BlockReaderLocalMetrics {
+
+  @Metric(value = "short circuit read operation rate", valueName = "LatencyMs")
+  private MutableRollingAverages shortCircuitReadRollingAverages;
+
+  private static final String SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME =
+      "HdfsShortCircuitReads";
+  private static final String SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_NAME =
+      "ShortCircuitLocalReads";
+
+  public static BlockReaderLocalMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    BlockReaderLocalMetrics metrics = new BlockReaderLocalMetrics();
+
+    ms.register(
+        SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME, null, metrics);
+    return metrics;
+  }
+
+  /**
+   * Adds short circuit read elapsed time.
+   */
+  public void addShortCircuitReadLatency(final long latency) {
+    shortCircuitReadRollingAverages.add(
+        SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_NAME, latency);
+  }
+
+  /**
+   * Collects states maintained in {@link ThreadLocal}, if any.
+   */
+  public void collectThreadLocalStates() {
+    shortCircuitReadRollingAverages.collectThreadLocalStates();
+  }
+
+  /**
+   * Get the MutableRollingAverage metric for testing only.
+   * @return
+   */
+  @VisibleForTesting
+  public MutableRollingAverages getShortCircuitReadRollingAverages() {
+    return shortCircuitReadRollingAverages;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java
new file mode 100644
index 0000000..a97ed43
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for tracking Block Reader Local's latencies.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.client.impl.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java
new file mode 100644
index 0000000..3eae516
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.times;
+
+/**
+ * Tests {@link BlockReaderIoProvider}'s profiling of short circuit read
+ * latencies.
+ */
+public class TestBlockReaderIoProvider {
+
+  private static final long SLOW_READ_THRESHOLD = 5000;
+
+  private static final FakeTimer TIMER = new FakeTimer();
+
+  @Test(timeout = 300_000)
+  public void testSlowShortCircuitReadsIsRecorded() throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit
+        .METRICS_SAMPLING_PERCENTAGE_KEY, 100);
+    DfsClientConf clientConf = new DfsClientConf(conf);
+
+    BlockReaderLocalMetrics metrics = Mockito.mock(
+        BlockReaderLocalMetrics.class);
+
+    FileChannel dataIn = Mockito.mock(FileChannel.class);
+    Mockito.when(dataIn.read(any(ByteBuffer.class), anyLong())).thenAnswer(
+        new Answer<Object>() {
+          @Override
+          public Object answer(InvocationOnMock invocation) throws Throwable {
+            TIMER.advance(SLOW_READ_THRESHOLD);
+            return 0;
+          }
+        });
+
+    BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
+        clientConf.getShortCircuitConf(), metrics, TIMER);
+
+    blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
+
+    Mockito.verify(metrics, times(1)).addShortCircuitReadLatency(anyLong());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java
new file mode 100644
index 0000000..b461f2e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
+import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
+import org.apache.hadoop.test.GenericTestUtils;
+import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import org.apache.hadoop.util.FakeTimer;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests {@link BlockReaderLocalMetrics}'s statistics.
+ */
+public class TestBlockReaderLocalMetrics {
+  private static final long ROLLING_AVERAGES_WINDOW_LENGTH_MS = 1000;
+  private static final int ROLLING_AVERAGE_NUM_WINDOWS = 5;
+  private static final long SLOW_READ_DELAY = 2000;
+  private static final String SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME =
+      "HdfsShortCircuitReads";
+  private static final String SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME =
+      "[ShortCircuitLocalReads]RollingAvgLatencyMs";
+
+  private static final FakeTimer TIMER = new FakeTimer();
+
+  private static HdfsConfiguration conf = new HdfsConfiguration();
+  private static DfsClientConf clientConf;
+
+  static {
+    conf = new HdfsConfiguration();
+    conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit
+        .METRICS_SAMPLING_PERCENTAGE_KEY, 100);
+    clientConf = new DfsClientConf(conf);
+  }
+
+  @Test(timeout = 300_000)
+  public void testSlowShortCircuitReadsStatsRecorded() throws IOException,
+      InterruptedException, TimeoutException {
+
+    BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
+    MutableRollingAverages shortCircuitReadRollingAverages = metrics
+        .getShortCircuitReadRollingAverages();
+    MetricsTestHelper.replaceRollingAveragesScheduler(
+        shortCircuitReadRollingAverages,
+        ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
+        TimeUnit.MILLISECONDS);
+
+    FileChannel dataIn = Mockito.mock(FileChannel.class);
+    Mockito.when(dataIn.read(any(ByteBuffer.class), anyLong())).thenAnswer(
+        new Answer<Object>() {
+          @Override
+          public Object answer(InvocationOnMock invocation) throws Throwable {
+            TIMER.advance(SLOW_READ_DELAY);
+            return 0;
+          }
+        });
+
+    BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
+        clientConf.getShortCircuitConf(), metrics, TIMER);
+
+    blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
+    blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        metrics.collectThreadLocalStates();
+        return shortCircuitReadRollingAverages.getStats(0).size() > 0;
+      }
+    }, 500, 10000);
+
+    MetricsRecordBuilder rb = getMetrics(
+        SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
+    double averageLatency = getDoubleGauge(
+        SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
+    assertTrue("Average Latency of Short Circuit Reads lower than expected",
+        averageLatency >= SLOW_READ_DELAY);
+  }
+
+  @Test(timeout = 300_000)
+  public void testMutlipleBlockReaderIoProviderStats() throws IOException,
+      InterruptedException, TimeoutException {
+
+    BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
+    MutableRollingAverages shortCircuitReadRollingAverages = metrics
+        .getShortCircuitReadRollingAverages();
+    MetricsTestHelper.replaceRollingAveragesScheduler(
+        shortCircuitReadRollingAverages,
+        ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
+        TimeUnit.MILLISECONDS);
+
+    FileChannel dataIn1 = Mockito.mock(FileChannel.class);
+    FileChannel dataIn2 = Mockito.mock(FileChannel.class);
+
+    Mockito.when(dataIn1.read(any(ByteBuffer.class), anyLong())).thenAnswer(
+        new Answer<Object>() {
+          @Override
+          public Object answer(InvocationOnMock invocation) throws Throwable {
+            TIMER.advance(SLOW_READ_DELAY);
+            return 0;
+          }
+        });
+
+    Mockito.when(dataIn2.read(any(ByteBuffer.class), anyLong())).thenAnswer(
+        new Answer<Object>() {
+          @Override
+          public Object answer(InvocationOnMock invocation) throws Throwable {
+            TIMER.advance(SLOW_READ_DELAY*3);
+            return 0;
+          }
+        });
+
+    BlockReaderIoProvider blockReaderIoProvider1 = new BlockReaderIoProvider(
+        clientConf.getShortCircuitConf(), metrics, TIMER);
+    BlockReaderIoProvider blockReaderIoProvider2 = new BlockReaderIoProvider(
+        clientConf.getShortCircuitConf(), metrics, TIMER);
+
+    blockReaderIoProvider1.read(dataIn1, any(ByteBuffer.class), anyLong());
+    blockReaderIoProvider2.read(dataIn2, any(ByteBuffer.class), anyLong());
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        metrics.collectThreadLocalStates();
+        return shortCircuitReadRollingAverages.getStats(0).size() > 0;
+      }
+    }, 500, 10000);
+
+    MetricsRecordBuilder rb = getMetrics(
+        SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
+    double averageLatency = getDoubleGauge(
+        SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
+
+    assertTrue("Average Latency of Short Circuit Reads lower than expected",
+        averageLatency >= SLOW_READ_DELAY*2);
+  }
+
+  @Test(timeout = 300_000)
+  public void testSlowShortCircuitReadsAverageLatencyValue() throws IOException,
+      InterruptedException, TimeoutException {
+
+    BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
+    final MutableRollingAverages shortCircuitReadRollingAverages = metrics
+        .getShortCircuitReadRollingAverages();
+    MetricsTestHelper.replaceRollingAveragesScheduler(
+        shortCircuitReadRollingAverages,
+        ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
+        TimeUnit.MILLISECONDS);
+
+    Random random = new Random();
+    FileChannel[] dataIns = new FileChannel[5];
+    long totalDelay = 0;
+
+    for (int i = 0; i < 5; i++) {
+      dataIns[i] = Mockito.mock(FileChannel.class);
+      long delay = SLOW_READ_DELAY * random.nextInt(5);
+      Mockito.when(dataIns[i].read(any(ByteBuffer.class), anyLong()))
+          .thenAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+              TIMER.advance(delay);
+              return 0;
+            }
+          });
+      totalDelay += delay;
+    }
+    long expectedAvgLatency = totalDelay / 5;
+
+    BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
+        clientConf.getShortCircuitConf(), metrics, TIMER);
+
+    for (int i = 0; i < 5; i++) {
+      blockReaderIoProvider.read(dataIns[i], any(ByteBuffer.class), anyLong());
+    }
+
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        metrics.collectThreadLocalStates();
+        return shortCircuitReadRollingAverages.getStats(0).size() > 0;
+      }
+    }, 500, 10000);
+
+    MetricsRecordBuilder rb = getMetrics(
+        SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
+    double averageLatency = getDoubleGauge(
+        SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
+
+    assertTrue("Average Latency of Short Circuit Reads lower than expected",
+        averageLatency >= expectedAvgLatency);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org