You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/06/26 12:50:10 UTC
[36/50] hadoop git commit: HDFS-11789. Maintain Short-Circuit Read
Statistics. Contributed by Hanisha Koneru.
HDFS-11789. Maintain Short-Circuit Read Statistics. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6d116ffa
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6d116ffa
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6d116ffa
Branch: refs/heads/HADOOP-13345
Commit: 6d116ffad23b470f8e9ca131d8e89cbbbb4378d7
Parents: 49aa60e
Author: Arpit Agarwal <ar...@apache.org>
Authored: Thu Jun 22 13:35:56 2017 -0700
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Thu Jun 22 13:35:56 2017 -0700
----------------------------------------------------------------------
.../hdfs/client/HdfsClientConfigKeys.java | 4 +
.../hdfs/client/impl/BlockReaderLocal.java | 52 ++++-
.../hadoop/hdfs/client/impl/DfsClientConf.java | 26 +++
.../impl/metrics/BlockReaderIoProvider.java | 89 ++++++++
.../impl/metrics/BlockReaderLocalMetrics.java | 78 +++++++
.../hdfs/client/impl/metrics/package-info.java | 27 +++
.../client/impl/TestBlockReaderIoProvider.java | 75 ++++++
.../impl/TestBlockReaderLocalMetrics.java | 227 +++++++++++++++++++
8 files changed, 566 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index fbc8d89..5667989 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -343,6 +343,10 @@ public interface HdfsClientConfigKeys {
int STREAMS_CACHE_SIZE_DEFAULT = 256;
String STREAMS_CACHE_EXPIRY_MS_KEY = PREFIX + "streams.cache.expiry.ms";
long STREAMS_CACHE_EXPIRY_MS_DEFAULT = 5*MINUTE;
+
+ String METRICS_SAMPLING_PERCENTAGE_KEY =
+ PREFIX + "metrics.sampling.percentage";
+ int METRICS_SAMPLING_PERCENTAGE_DEFAULT = 0;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
index 1b38996..df0f65f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderLocal.java
@@ -17,17 +17,16 @@
*/
package org.apache.hadoop.hdfs.client.impl;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.EnumSet;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.BlockReader;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
@@ -35,15 +34,19 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DirectBufferPool;
+import org.apache.hadoop.util.Timer;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.EnumSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
/**
* BlockReaderLocal enables local short circuited reads. If the DFS client is on
* the same machine as the datanode, then the client can read files directly
@@ -66,6 +69,11 @@ class BlockReaderLocal implements BlockReader {
private static final DirectBufferPool bufferPool = new DirectBufferPool();
+ private static BlockReaderLocalMetrics metrics;
+ private static Lock metricsInitializationLock = new ReentrantLock();
+ private final BlockReaderIoProvider blockReaderIoProvider;
+ private static final Timer TIMER = new Timer();
+
public static class Builder {
private final int bufferSize;
private boolean verifyChecksum;
@@ -76,8 +84,10 @@ class BlockReaderLocal implements BlockReader {
private ExtendedBlock block;
private StorageType storageType;
private Tracer tracer;
+ private ShortCircuitConf shortCircuitConf;
public Builder(ShortCircuitConf conf) {
+ this.shortCircuitConf = conf;
this.maxReadahead = Integer.MAX_VALUE;
this.verifyChecksum = !conf.isSkipShortCircuitChecksums();
this.bufferSize = conf.getShortCircuitBufferSize();
@@ -269,6 +279,20 @@ class BlockReaderLocal implements BlockReader {
this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
this.storageType = builder.storageType;
this.tracer = builder.tracer;
+
+ if (builder.shortCircuitConf.isScrMetricsEnabled()) {
+ metricsInitializationLock.lock();
+ try {
+ if (metrics == null) {
+ metrics = BlockReaderLocalMetrics.create();
+ }
+ } finally {
+ metricsInitializationLock.unlock();
+ }
+ }
+
+ this.blockReaderIoProvider = new BlockReaderIoProvider(
+ builder.shortCircuitConf, metrics, TIMER);
}
private synchronized void createDataBufIfNeeded() {
@@ -342,7 +366,7 @@ class BlockReaderLocal implements BlockReader {
long startDataPos = dataPos;
int startBufPos = buf.position();
while (buf.hasRemaining()) {
- int nRead = dataIn.read(buf, dataPos);
+ int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
if (nRead < 0) {
break;
}
@@ -435,7 +459,7 @@ class BlockReaderLocal implements BlockReader {
freeChecksumBufIfExists();
int total = 0;
while (buf.hasRemaining()) {
- int nRead = dataIn.read(buf, dataPos);
+ int nRead = blockReaderIoProvider.read(dataIn, buf, dataPos);
if (nRead <= 0) break;
dataPos += nRead;
total += nRead;
@@ -574,7 +598,8 @@ class BlockReaderLocal implements BlockReader {
int len) throws IOException {
freeDataBufIfExists();
freeChecksumBufIfExists();
- int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
+ int nRead = blockReaderIoProvider.read(
+ dataIn, ByteBuffer.wrap(arr, off, len), dataPos);
if (nRead > 0) {
dataPos += nRead;
} else if ((nRead == 0) && (dataPos == dataIn.size())) {
@@ -627,6 +652,9 @@ class BlockReaderLocal implements BlockReader {
replica.unref();
freeDataBufIfExists();
freeChecksumBufIfExists();
+ if (metrics != null) {
+ metrics.collectThreadLocalStates();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
index b2fd487..332abb5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
@@ -593,6 +593,10 @@ public class DfsClientConf {
private final long shortCircuitStreamsCacheExpiryMs;
private final int shortCircuitSharedMemoryWatcherInterruptCheckMs;
+ // Short Circuit Read Metrics
+ private final boolean scrMetricsEnabled;
+ private final int scrMetricsSamplingPercentage;
+
private final boolean shortCircuitMmapEnabled;
private final int shortCircuitMmapCacheSize;
private final long shortCircuitMmapCacheExpiryMs;
@@ -615,6 +619,20 @@ public class DfsClientConf {
shortCircuitLocalReads = conf.getBoolean(
Read.ShortCircuit.KEY,
Read.ShortCircuit.DEFAULT);
+ int scrSamplingPercentage = conf.getInt(
+ Read.ShortCircuit.METRICS_SAMPLING_PERCENTAGE_KEY,
+ Read.ShortCircuit.METRICS_SAMPLING_PERCENTAGE_DEFAULT);
+ if (scrSamplingPercentage <= 0) {
+ scrMetricsSamplingPercentage = 0;
+ scrMetricsEnabled = false;
+ } else if (scrSamplingPercentage > 100) {
+ scrMetricsSamplingPercentage = 100;
+ scrMetricsEnabled = true;
+ } else {
+ scrMetricsSamplingPercentage = scrSamplingPercentage;
+ scrMetricsEnabled = true;
+ }
+
domainSocketDataTraffic = conf.getBoolean(
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,
DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT);
@@ -693,6 +711,14 @@ public class DfsClientConf {
return shortCircuitLocalReads;
}
+ public boolean isScrMetricsEnabled() {
+ return scrMetricsEnabled;
+ }
+
+ public int getScrMetricsSamplingPercentage() {
+ return scrMetricsSamplingPercentage;
+ }
+
public boolean isDomainSocketDataTraffic() {
return domainSocketDataTraffic;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java
new file mode 100644
index 0000000..0792db8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderIoProvider.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl.metrics;
+
+import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
+import org.apache.hadoop.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Profiles {@link org.apache.hadoop.hdfs.client.impl.BlockReaderLocal} short
+ * circuit read latencies when ShortCircuit read metrics is enabled through
+ * {@link ShortCircuitConf#scrMetricsEnabled}.
+ */
+public class BlockReaderIoProvider {
+ public static final Logger LOG = LoggerFactory.getLogger(
+ BlockReaderIoProvider.class);
+
+ private final BlockReaderLocalMetrics metrics;
+ private final boolean isEnabled;
+ private final int sampleRangeMax;
+ private final Timer timer;
+
+ // Threshold in milliseconds above which a warning should be flagged.
+ private static final long SLOW_READ_WARNING_THRESHOLD_MS = 1000;
+ private boolean isWarningLogged = false;
+
+ public BlockReaderIoProvider(@Nullable ShortCircuitConf conf,
+ BlockReaderLocalMetrics metrics, Timer timer) {
+ if (conf != null) {
+ isEnabled = conf.isScrMetricsEnabled();
+ sampleRangeMax = (Integer.MAX_VALUE / 100) *
+ conf.getScrMetricsSamplingPercentage();
+ this.metrics = metrics;
+ this.timer = timer;
+ } else {
+ this.isEnabled = false;
+ this.sampleRangeMax = 0;
+ this.metrics = null;
+ this.timer = null;
+ }
+ }
+
+ public int read(FileChannel dataIn, ByteBuffer dst, long position)
+ throws IOException{
+ final int nRead;
+ if (isEnabled && (ThreadLocalRandom.current().nextInt() < sampleRangeMax)) {
+ long begin = timer.monotonicNow();
+ nRead = dataIn.read(dst, position);
+ long latency = timer.monotonicNow() - begin;
+ addLatency(latency);
+ } else {
+ nRead = dataIn.read(dst, position);
+ }
+ return nRead;
+ }
+
+ private void addLatency(long latency) {
+ metrics.addShortCircuitReadLatency(latency);
+ if (latency > SLOW_READ_WARNING_THRESHOLD_MS && !isWarningLogged) {
+ LOG.warn(String.format("The Short Circuit Local Read latency, %d ms, " +
+ "is higher then the threshold (%d ms). Suppressing further warnings" +
+ " for this BlockReaderLocal.",
+ latency, SLOW_READ_WARNING_THRESHOLD_MS));
+ isWarningLogged = true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java
new file mode 100644
index 0000000..61b497e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/BlockReaderLocalMetrics.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl.metrics;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
+
+/**
+ * This class maintains a metric of rolling average latency for short circuit
+ * reads.
+ */
+@InterfaceAudience.Private
+@Metrics(name="HdfsShortCircuitReads",
+ about="Block Reader Local's Short Circuit Read latency",
+ context="dfs")
+public class BlockReaderLocalMetrics {
+
+ @Metric(value = "short circuit read operation rate", valueName = "LatencyMs")
+ private MutableRollingAverages shortCircuitReadRollingAverages;
+
+ private static final String SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME =
+ "HdfsShortCircuitReads";
+ private static final String SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_NAME =
+ "ShortCircuitLocalReads";
+
+ public static BlockReaderLocalMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ BlockReaderLocalMetrics metrics = new BlockReaderLocalMetrics();
+
+ ms.register(
+ SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME, null, metrics);
+ return metrics;
+ }
+
+ /**
+ * Adds short circuit read elapsed time.
+ */
+ public void addShortCircuitReadLatency(final long latency) {
+ shortCircuitReadRollingAverages.add(
+ SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_NAME, latency);
+ }
+
+ /**
+ * Collects states maintained in {@link ThreadLocal}, if any.
+ */
+ public void collectThreadLocalStates() {
+ shortCircuitReadRollingAverages.collectThreadLocalStates();
+ }
+
+ /**
+ * Get the MutableRollingAverage metric for testing only.
+ * @return
+ */
+ @VisibleForTesting
+ public MutableRollingAverages getShortCircuitReadRollingAverages() {
+ return shortCircuitReadRollingAverages;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java
new file mode 100644
index 0000000..a97ed43
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/metrics/package-info.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Support for tracking Block Reader Local's latencies.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.hdfs.client.impl.metrics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java
new file mode 100644
index 0000000..3eae516
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderIoProvider.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
+import org.apache.hadoop.util.FakeTimer;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.times;
+
+/**
+ * Tests {@link BlockReaderIoProvider}'s profiling of short circuit read
+ * latencies.
+ */
+public class TestBlockReaderIoProvider {
+
+ private static final long SLOW_READ_THRESHOLD = 5000;
+
+ private static final FakeTimer TIMER = new FakeTimer();
+
+ @Test(timeout = 300_000)
+ public void testSlowShortCircuitReadsIsRecorded() throws IOException {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit
+ .METRICS_SAMPLING_PERCENTAGE_KEY, 100);
+ DfsClientConf clientConf = new DfsClientConf(conf);
+
+ BlockReaderLocalMetrics metrics = Mockito.mock(
+ BlockReaderLocalMetrics.class);
+
+ FileChannel dataIn = Mockito.mock(FileChannel.class);
+ Mockito.when(dataIn.read(any(ByteBuffer.class), anyLong())).thenAnswer(
+ new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ TIMER.advance(SLOW_READ_THRESHOLD);
+ return 0;
+ }
+ });
+
+ BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
+ clientConf.getShortCircuitConf(), metrics, TIMER);
+
+ blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
+
+ Mockito.verify(metrics, times(1)).addShortCircuitReadLatency(anyLong());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6d116ffa/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java
new file mode 100644
index 0000000..b461f2e
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocalMetrics.java
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.client.impl;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderIoProvider;
+import org.apache.hadoop.hdfs.client.impl.metrics.BlockReaderLocalMetrics;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
+import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
+import org.apache.hadoop.test.GenericTestUtils;
+import static org.apache.hadoop.test.MetricsAsserts.getDoubleGauge;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import org.apache.hadoop.util.FakeTimer;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Tests {@link BlockReaderLocalMetrics}'s statistics.
+ */
+public class TestBlockReaderLocalMetrics {
+ private static final long ROLLING_AVERAGES_WINDOW_LENGTH_MS = 1000;
+ private static final int ROLLING_AVERAGE_NUM_WINDOWS = 5;
+ private static final long SLOW_READ_DELAY = 2000;
+ private static final String SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME =
+ "HdfsShortCircuitReads";
+ private static final String SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME =
+ "[ShortCircuitLocalReads]RollingAvgLatencyMs";
+
+ private static final FakeTimer TIMER = new FakeTimer();
+
+ private static HdfsConfiguration conf = new HdfsConfiguration();
+ private static DfsClientConf clientConf;
+
+ static {
+ conf = new HdfsConfiguration();
+ conf.setInt(HdfsClientConfigKeys.Read.ShortCircuit
+ .METRICS_SAMPLING_PERCENTAGE_KEY, 100);
+ clientConf = new DfsClientConf(conf);
+ }
+
+ @Test(timeout = 300_000)
+ public void testSlowShortCircuitReadsStatsRecorded() throws IOException,
+ InterruptedException, TimeoutException {
+
+ BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
+ MutableRollingAverages shortCircuitReadRollingAverages = metrics
+ .getShortCircuitReadRollingAverages();
+ MetricsTestHelper.replaceRollingAveragesScheduler(
+ shortCircuitReadRollingAverages,
+ ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
+ TimeUnit.MILLISECONDS);
+
+ FileChannel dataIn = Mockito.mock(FileChannel.class);
+ Mockito.when(dataIn.read(any(ByteBuffer.class), anyLong())).thenAnswer(
+ new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ TIMER.advance(SLOW_READ_DELAY);
+ return 0;
+ }
+ });
+
+ BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
+ clientConf.getShortCircuitConf(), metrics, TIMER);
+
+ blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
+ blockReaderIoProvider.read(dataIn, any(ByteBuffer.class), anyLong());
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ metrics.collectThreadLocalStates();
+ return shortCircuitReadRollingAverages.getStats(0).size() > 0;
+ }
+ }, 500, 10000);
+
+ MetricsRecordBuilder rb = getMetrics(
+ SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
+ double averageLatency = getDoubleGauge(
+ SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
+ assertTrue("Average Latency of Short Circuit Reads lower than expected",
+ averageLatency >= SLOW_READ_DELAY);
+ }
+
+ @Test(timeout = 300_000)
+ public void testMutlipleBlockReaderIoProviderStats() throws IOException,
+ InterruptedException, TimeoutException {
+
+ BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
+ MutableRollingAverages shortCircuitReadRollingAverages = metrics
+ .getShortCircuitReadRollingAverages();
+ MetricsTestHelper.replaceRollingAveragesScheduler(
+ shortCircuitReadRollingAverages,
+ ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
+ TimeUnit.MILLISECONDS);
+
+ FileChannel dataIn1 = Mockito.mock(FileChannel.class);
+ FileChannel dataIn2 = Mockito.mock(FileChannel.class);
+
+ Mockito.when(dataIn1.read(any(ByteBuffer.class), anyLong())).thenAnswer(
+ new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ TIMER.advance(SLOW_READ_DELAY);
+ return 0;
+ }
+ });
+
+ Mockito.when(dataIn2.read(any(ByteBuffer.class), anyLong())).thenAnswer(
+ new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ TIMER.advance(SLOW_READ_DELAY*3);
+ return 0;
+ }
+ });
+
+ BlockReaderIoProvider blockReaderIoProvider1 = new BlockReaderIoProvider(
+ clientConf.getShortCircuitConf(), metrics, TIMER);
+ BlockReaderIoProvider blockReaderIoProvider2 = new BlockReaderIoProvider(
+ clientConf.getShortCircuitConf(), metrics, TIMER);
+
+ blockReaderIoProvider1.read(dataIn1, any(ByteBuffer.class), anyLong());
+ blockReaderIoProvider2.read(dataIn2, any(ByteBuffer.class), anyLong());
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ metrics.collectThreadLocalStates();
+ return shortCircuitReadRollingAverages.getStats(0).size() > 0;
+ }
+ }, 500, 10000);
+
+ MetricsRecordBuilder rb = getMetrics(
+ SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
+ double averageLatency = getDoubleGauge(
+ SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
+
+ assertTrue("Average Latency of Short Circuit Reads lower than expected",
+ averageLatency >= SLOW_READ_DELAY*2);
+ }
+
+ @Test(timeout = 300_000)
+ public void testSlowShortCircuitReadsAverageLatencyValue() throws IOException,
+ InterruptedException, TimeoutException {
+
+ BlockReaderLocalMetrics metrics = BlockReaderLocalMetrics.create();
+ final MutableRollingAverages shortCircuitReadRollingAverages = metrics
+ .getShortCircuitReadRollingAverages();
+ MetricsTestHelper.replaceRollingAveragesScheduler(
+ shortCircuitReadRollingAverages,
+ ROLLING_AVERAGE_NUM_WINDOWS, ROLLING_AVERAGES_WINDOW_LENGTH_MS,
+ TimeUnit.MILLISECONDS);
+
+ Random random = new Random();
+ FileChannel[] dataIns = new FileChannel[5];
+ long totalDelay = 0;
+
+ for (int i = 0; i < 5; i++) {
+ dataIns[i] = Mockito.mock(FileChannel.class);
+ long delay = SLOW_READ_DELAY * random.nextInt(5);
+ Mockito.when(dataIns[i].read(any(ByteBuffer.class), anyLong()))
+ .thenAnswer(new Answer<Object>() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ TIMER.advance(delay);
+ return 0;
+ }
+ });
+ totalDelay += delay;
+ }
+ long expectedAvgLatency = totalDelay / 5;
+
+ BlockReaderIoProvider blockReaderIoProvider = new BlockReaderIoProvider(
+ clientConf.getShortCircuitConf(), metrics, TIMER);
+
+ for (int i = 0; i < 5; i++) {
+ blockReaderIoProvider.read(dataIns[i], any(ByteBuffer.class), anyLong());
+ }
+
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ metrics.collectThreadLocalStates();
+ return shortCircuitReadRollingAverages.getStats(0).size() > 0;
+ }
+ }, 500, 10000);
+
+ MetricsRecordBuilder rb = getMetrics(
+ SHORT_CIRCUIT_READ_METRIC_REGISTERED_NAME);
+ double averageLatency = getDoubleGauge(
+ SHORT_CIRCUIT_LOCAL_READS_METRIC_VALUE_FULL_NAME, rb);
+
+ assertTrue("Average Latency of Short Circuit Reads lower than expected",
+ averageLatency >= expectedAvgLatency);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org