You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dr...@apache.org on 2016/09/13 08:29:46 UTC
[32/50] [abbrv] hadoop git commit: HDFS-10742. Measure lock time in
FsDatasetImpl. Contributed by Chen Liang
HDFS-10742. Measure lock time in FsDatasetImpl. Contributed by Chen Liang
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/011f3b24
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/011f3b24
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/011f3b24
Branch: refs/heads/HDFS-10285
Commit: 011f3b24d4bfda505a90ab5b5576916a41f869c5
Parents: 1192781
Author: Chris Douglas <cd...@apache.org>
Authored: Thu Sep 8 17:52:02 2016 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Thu Sep 8 17:53:40 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/util/AutoCloseableLock.java | 28 ++-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +
.../apache/hadoop/hdfs/InstrumentedLock.java | 185 +++++++++++++++++++
.../datanode/fsdataset/impl/FsDatasetImpl.java | 10 +-
.../src/main/resources/hdfs-default.xml | 8 +
.../hadoop/hdfs/TestInstrumentedLock.java | 166 +++++++++++++++++
6 files changed, 395 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/011f3b24/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
index 2aa8578..d920bc6 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/AutoCloseableLock.java
@@ -17,22 +17,33 @@
*/
package org.apache.hadoop.util;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This is a wrap class of a ReentrantLock. Extending AutoCloseable
* interface such that the users can use a try-with-resource syntax.
*/
public class AutoCloseableLock implements AutoCloseable {
- private final ReentrantLock lock;
+ private final Lock lock;
/**
* Creates an instance of {@code AutoCloseableLock}, initializes
- * the underlying {@code ReentrantLock} object.
+ * the underlying lock instance with a new {@code ReentrantLock}.
*/
public AutoCloseableLock() {
- this.lock = new ReentrantLock();
+ this(new ReentrantLock());
+ }
+
+ /**
+ * Wrap provided Lock instance.
+ * @param lock Lock instance to wrap in AutoCloseable API.
+ */
+ public AutoCloseableLock(Lock lock) {
+ this.lock = lock;
}
/**
@@ -86,7 +97,7 @@ public class AutoCloseableLock implements AutoCloseable {
/**
* A wrapper method that makes a call to {@code tryLock()} of
- * the underlying {@code ReentrantLock} object.
+ * the underlying {@code Lock} object.
*
* If the lock is not held by another thread, acquires the lock, set the
* hold count to one and returns {@code true}.
@@ -116,7 +127,12 @@ public class AutoCloseableLock implements AutoCloseable {
* @return {@code true} if any thread holds this lock and
* {@code false} otherwise
*/
- public boolean isLocked() {
- return lock.isLocked();
+ @VisibleForTesting
+ boolean isLocked() {
+ if (lock instanceof ReentrantLock) {
+ return ((ReentrantLock)lock).isLocked();
+ }
+ throw new UnsupportedOperationException();
}
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/011f3b24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 7bdfd44..caf6b60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -419,6 +419,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_KEY =
"dfs.namenode.read-lock-reporting-threshold-ms";
public static final long DFS_NAMENODE_READ_LOCK_REPORTING_THRESHOLD_MS_DEFAULT = 5000L;
+ // Threshold for how long the lock warnings must be suppressed
+ public static final String DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY =
+ "dfs.lock.suppress.warning.interval";
+ public static final long DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT =
+ 10000; //ms
public static final String DFS_UPGRADE_DOMAIN_FACTOR = "dfs.namenode.upgrade.domain.factor";
public static final int DFS_UPGRADE_DOMAIN_FACTOR_DEFAULT = DFS_REPLICATION_DEFAULT;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/011f3b24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
new file mode 100644
index 0000000..6279e95
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specifc lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+ private final Lock lock;
+ private final Log logger;
+ private final String name;
+ private final Timer clock;
+
+ /** Minimum gap between two lock warnings. */
+ private final long minLoggingGap;
+ /** Threshold for detecting long lock held time. */
+ private final long lockWarningThreshold;
+
+ // Tracking counters for lock statistics.
+ private volatile long lockAcquireTimestamp;
+ private final AtomicLong lastLogTimestamp;
+ private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+ /**
+ * Create a instrumented lock instance which logs a warning message
+ * when lock held time is above given threshold.
+ *
+ * @param name the identifier of the lock object
+ * @param logger this class does not have its own logger, will log to the
+ * given logger instead
+ * @param minLoggingGapMs the minimum time gap between two log messages,
+ * this is to avoid spamming to many logs
+ * @param lockWarningThresholdMs the time threshold to view lock held
+ * time as being "too long"
+ */
+ public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+ long lockWarningThresholdMs) {
+ this(name, logger, new ReentrantLock(),
+ minLoggingGapMs, lockWarningThresholdMs);
+ }
+
+ public InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ this(name, logger, lock,
+ minLoggingGapMs, lockWarningThresholdMs, new Timer());
+ }
+
+ @VisibleForTesting
+ InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+ this.name = name;
+ this.lock = lock;
+ this.clock = clock;
+ this.logger = logger;
+ minLoggingGap = minLoggingGapMs;
+ lockWarningThreshold = lockWarningThresholdMs;
+ lastLogTimestamp = new AtomicLong(
+ clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+ }
+
+ @Override
+ public void lock() {
+ lock.lock();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ lock.lockInterruptibly();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public boolean tryLock() {
+ if (lock.tryLock()) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ if (lock.tryLock(time, unit)) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void unlock() {
+ long localLockReleaseTime = clock.monotonicNow();
+ long localLockAcquireTime = lockAcquireTimestamp;
+ lock.unlock();
+ check(localLockAcquireTime, localLockReleaseTime);
+ }
+
+ @Override
+ public Condition newCondition() {
+ return lock.newCondition();
+ }
+
+ @VisibleForTesting
+ void logWarning(long lockHeldTime, long suppressed) {
+ logger.warn(String.format("Lock held time above threshold: " +
+ "lock identifier: %s " +
+ "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+ "The stack trace is: %s" ,
+ name, lockHeldTime, suppressed,
+ StringUtils.getStackTrace(Thread.currentThread())));
+ }
+
+ /**
+ * Log a warning if the lock was held for too long.
+ *
+ * Should be invoked by the caller immediately AFTER releasing the lock.
+ *
+ * @param acquireTime - timestamp just after acquiring the lock.
+ * @param releaseTime - timestamp just before releasing the lock.
+ */
+ private void check(long acquireTime, long releaseTime) {
+ if (!logger.isWarnEnabled()) {
+ return;
+ }
+
+ final long lockHeldTime = releaseTime - acquireTime;
+ if (lockWarningThreshold - lockHeldTime < 0) {
+ long now;
+ long localLastLogTs;
+ do {
+ now = clock.monotonicNow();
+ localLastLogTs = lastLogTimestamp.get();
+ long deltaSinceLastLog = now - localLastLogTs;
+ // check should print log or not
+ if (deltaSinceLastLog - minLoggingGap < 0) {
+ warningsSuppressed.incrementAndGet();
+ return;
+ }
+ } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+ long suppressed = warningsSuppressed.getAndSet(0);
+ logWarning(lockHeldTime, suppressed);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/011f3b24/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ffd2f8a..e5da0e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -40,6 +40,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -60,6 +61,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -278,7 +280,13 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
- this.datasetLock = new AutoCloseableLock();
+ this.datasetLock = new AutoCloseableLock(
+ new InstrumentedLock(getClass().getName(), LOG,
+ conf.getTimeDuration(
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS),
+ 300));
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/011f3b24/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 3f78233..3a5de3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4273,4 +4273,12 @@
a plan.
</description>
</property>
+
+ <property>
+ <name>dfs.lock.suppress.warning.interval</name>
+ <value>10s</value>
+ <description>Instrumentation reporting long critical sections will suppress
+ consecutive warnings within this interval.</description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/011f3b24/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
new file mode 100644
index 0000000..f470688
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+ static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+ @Rule public TestName name = new TestName();
+
+ /**
+ * Test exclusive access of the lock.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testMultipleThread() throws Exception {
+ String testname = name.getMethodName();
+ InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+ lock.lock();
+ try {
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Test the correctness with try-with-resource syntax.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testTryWithResourceSyntax() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
+ Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
+ @Override
+ public void lock() {
+ super.lock();
+ lockThread.set(Thread.currentThread());
+ }
+ @Override
+ public void unlock() {
+ super.unlock();
+ lockThread.set(null);
+ }
+ };
+ AutoCloseableLock acl = new AutoCloseableLock(lock);
+ try (AutoCloseable localLock = acl.acquire()) {
+ assertEquals(acl, localLock);
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertNotEquals(Thread.currentThread(), lockThread.get());
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ assertEquals(Thread.currentThread(), lockThread.get());
+ }
+ assertNull(lockThread.get());
+ }
+
+ /**
+ * Test the lock logs warning when lock held time is greater than threshold
+ * and not log warning otherwise.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testLockLongHoldingReport() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicLong time = new AtomicLong(0);
+ Timer mclock = new Timer() {
+ @Override
+ public long monotonicNow() {
+ return time.get();
+ }
+ };
+ Lock mlock = mock(Lock.class);
+
+ final AtomicLong wlogged = new AtomicLong(0);
+ final AtomicLong wsuppresed = new AtomicLong(0);
+ InstrumentedLock lock = new InstrumentedLock(
+ testname, LOG, mlock, 2000, 300, mclock) {
+ @Override
+ void logWarning(long lockHeldTime, long suppressed) {
+ wlogged.incrementAndGet();
+ wsuppresed.set(suppressed);
+ }
+ };
+
+ // do not log warning when the lock held time is short
+ lock.lock(); // t = 0
+ time.set(200);
+ lock.unlock(); // t = 200
+ assertEquals(0, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ lock.lock(); // t = 200
+ time.set(700);
+ lock.unlock(); // t = 700
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // despite the lock held time is greater than threshold
+ // suppress the log warning due to the logging gap
+ // (not recorded in wsuppressed until next log message)
+ lock.lock(); // t = 700
+ time.set(1100);
+ lock.unlock(); // t = 1100
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // log a warning message when the lock held time is greater the threshold
+ // and the logging time gap is satisfied. Also should display suppressed
+ // previous warnings.
+ time.set(2400);
+ lock.lock(); // t = 2400
+ time.set(2800);
+ lock.unlock(); // t = 2800
+ assertEquals(2, wlogged.get());
+ assertEquals(1, wsuppresed.get());
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org