You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cd...@apache.org on 2016/10/21 20:03:39 UTC
[2/2] hadoop git commit: HADOOP-13702. Add instrumented
ReadWriteLock. Contributed by Jingcheng Du
HADOOP-13702. Add instrumented ReadWriteLock. Contributed by Jingcheng Du
(cherry picked from commit ae8bccd5090d8b42dae9a8e0c13a9766a7c42ecb)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/25f4327f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/25f4327f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/25f4327f
Branch: refs/heads/branch-2
Commit: 25f4327f0baa947bb99dc808077e3266e0fd982b
Parents: 385c1da
Author: Chris Douglas <cd...@apache.org>
Authored: Fri Oct 21 11:28:11 2016 -0700
Committer: Chris Douglas <cd...@apache.org>
Committed: Fri Oct 21 13:01:08 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/util/InstrumentedLock.java | 197 ++++++++++++++++
.../hadoop/util/InstrumentedReadLock.java | 92 ++++++++
.../hadoop/util/InstrumentedReadWriteLock.java | 58 +++++
.../hadoop/util/InstrumentedWriteLock.java | 54 +++++
.../hadoop/util/TestInstrumentedLock.java | 162 +++++++++++++
.../util/TestInstrumentedReadWriteLock.java | 234 +++++++++++++++++++
.../apache/hadoop/hdfs/InstrumentedLock.java | 185 ---------------
.../datanode/fsdataset/impl/FsDatasetImpl.java | 2 +-
.../hadoop/hdfs/TestInstrumentedLock.java | 166 -------------
9 files changed, 798 insertions(+), 352 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java
new file mode 100644
index 0000000..0520271
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedLock.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specific lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+ private final Lock lock;
+ private final Log logger;
+ private final String name;
+ private final Timer clock;
+
+ /** Minimum gap between two lock warnings. */
+ private final long minLoggingGap;
+ /** Threshold for detecting long lock held time. */
+ private final long lockWarningThreshold;
+
+ // Tracking counters for lock statistics.
+ private volatile long lockAcquireTimestamp;
+ private final AtomicLong lastLogTimestamp;
+ private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+ /**
+ * Create a instrumented lock instance which logs a warning message
+ * when lock held time is above given threshold.
+ *
+ * @param name the identifier of the lock object
+ * @param logger this class does not have its own logger, will log to the
+ * given logger instead
+ * @param minLoggingGapMs the minimum time gap between two log messages,
+ * this is to avoid spamming to many logs
+ * @param lockWarningThresholdMs the time threshold to view lock held
+ * time as being "too long"
+ */
+ public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+ long lockWarningThresholdMs) {
+ this(name, logger, new ReentrantLock(),
+ minLoggingGapMs, lockWarningThresholdMs);
+ }
+
+ public InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ this(name, logger, lock,
+ minLoggingGapMs, lockWarningThresholdMs, new Timer());
+ }
+
+ @VisibleForTesting
+ InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+ this.name = name;
+ this.lock = lock;
+ this.clock = clock;
+ this.logger = logger;
+ minLoggingGap = minLoggingGapMs;
+ lockWarningThreshold = lockWarningThresholdMs;
+ lastLogTimestamp = new AtomicLong(
+ clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+ }
+
+ @Override
+ public void lock() {
+ lock.lock();
+ startLockTiming();
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ lock.lockInterruptibly();
+ startLockTiming();
+ }
+
+ @Override
+ public boolean tryLock() {
+ if (lock.tryLock()) {
+ startLockTiming();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ if (lock.tryLock(time, unit)) {
+ startLockTiming();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void unlock() {
+ long localLockReleaseTime = clock.monotonicNow();
+ long localLockAcquireTime = lockAcquireTimestamp;
+ lock.unlock();
+ check(localLockAcquireTime, localLockReleaseTime);
+ }
+
+ @Override
+ public Condition newCondition() {
+ return lock.newCondition();
+ }
+
+ @VisibleForTesting
+ void logWarning(long lockHeldTime, long suppressed) {
+ logger.warn(String.format("Lock held time above threshold: " +
+ "lock identifier: %s " +
+ "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+ "The stack trace is: %s" ,
+ name, lockHeldTime, suppressed,
+ StringUtils.getStackTrace(Thread.currentThread())));
+ }
+
+ /**
+ * Starts timing for the instrumented lock.
+ */
+ protected void startLockTiming() {
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ /**
+ * Log a warning if the lock was held for too long.
+ *
+ * Should be invoked by the caller immediately AFTER releasing the lock.
+ *
+ * @param acquireTime - timestamp just after acquiring the lock.
+ * @param releaseTime - timestamp just before releasing the lock.
+ */
+ protected void check(long acquireTime, long releaseTime) {
+ if (!logger.isWarnEnabled()) {
+ return;
+ }
+
+ final long lockHeldTime = releaseTime - acquireTime;
+ if (lockWarningThreshold - lockHeldTime < 0) {
+ long now;
+ long localLastLogTs;
+ do {
+ now = clock.monotonicNow();
+ localLastLogTs = lastLogTimestamp.get();
+ long deltaSinceLastLog = now - localLastLogTs;
+ // check should print log or not
+ if (deltaSinceLastLog - minLoggingGap < 0) {
+ warningsSuppressed.incrementAndGet();
+ return;
+ }
+ } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+ long suppressed = warningsSuppressed.getAndSet(0);
+ logWarning(lockHeldTime, suppressed);
+ }
+ }
+
+ protected Lock getLock() {
+ return lock;
+ }
+
+ protected Timer getTimer() {
+ return clock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java
new file mode 100644
index 0000000..09fd43e
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadLock.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a wrap class of a <tt>ReadLock</tt>.
+ * It extends the class {@link InstrumentedLock}, and can be used to track
+ * whether a specific read lock is being held for too long and log
+ * warnings if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedReadLock extends InstrumentedLock {
+
+ private final ReentrantReadWriteLock readWriteLock;
+
+ /**
+ * Uses the ThreadLocal to keep the time of acquiring locks since
+ * there can be multiple threads that hold the read lock concurrently.
+ */
+ private final ThreadLocal<Long> readLockHeldTimeStamp =
+ new ThreadLocal<Long>() {
+ @Override
+ protected Long initialValue() {
+ return Long.MAX_VALUE;
+ };
+ };
+
+ public InstrumentedReadLock(String name, Log logger,
+ ReentrantReadWriteLock readWriteLock,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ this(name, logger, readWriteLock, minLoggingGapMs, lockWarningThresholdMs,
+ new Timer());
+ }
+
+ @VisibleForTesting
+ InstrumentedReadLock(String name, Log logger,
+ ReentrantReadWriteLock readWriteLock,
+ long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+ super(name, logger, readWriteLock.readLock(), minLoggingGapMs,
+ lockWarningThresholdMs, clock);
+ this.readWriteLock = readWriteLock;
+ }
+
+ @Override
+ public void unlock() {
+ boolean needReport = readWriteLock.getReadHoldCount() == 1;
+ long localLockReleaseTime = getTimer().monotonicNow();
+ long localLockAcquireTime = readLockHeldTimeStamp.get();
+ getLock().unlock();
+ if (needReport) {
+ readLockHeldTimeStamp.remove();
+ check(localLockAcquireTime, localLockReleaseTime);
+ }
+ }
+
+ /**
+ * Starts timing for the instrumented read lock.
+ * It records the time to ThreadLocal.
+ */
+ @Override
+ protected void startLockTiming() {
+ if (readWriteLock.getReadHoldCount() == 1) {
+ readLockHeldTimeStamp.set(getTimer().monotonicNow());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
new file mode 100644
index 0000000..62e6b09
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedReadWriteLock.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This is a wrap class of a {@link ReentrantReadWriteLock}.
+ * It implements the interface {@link ReadWriteLock}, and can be used to
+ * create instrumented <tt>ReadLock</tt> and <tt>WriteLock</tt>.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedReadWriteLock implements ReadWriteLock {
+
+ private final Lock readLock;
+ private final Lock writeLock;
+
+ InstrumentedReadWriteLock(boolean fair, String name, Log logger,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(fair);
+ readLock = new InstrumentedReadLock(name, logger, readWriteLock,
+ minLoggingGapMs, lockWarningThresholdMs);
+ writeLock = new InstrumentedWriteLock(name, logger, readWriteLock,
+ minLoggingGapMs, lockWarningThresholdMs);
+ }
+
+ @Override
+ public Lock readLock() {
+ return readLock;
+ }
+
+ @Override
+ public Lock writeLock() {
+ return writeLock;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedWriteLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedWriteLock.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedWriteLock.java
new file mode 100644
index 0000000..9208c1b
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/InstrumentedWriteLock.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a wrap class of a <tt>WriteLock</tt>.
+ * It extends the class {@link InstrumentedLock}, and can be used to track
+ * whether a specific write lock is being held for too long and log
+ * warnings if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedWriteLock extends InstrumentedLock {
+
+ public InstrumentedWriteLock(String name, Log logger,
+ ReentrantReadWriteLock readWriteLock,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ this(name, logger, readWriteLock, minLoggingGapMs, lockWarningThresholdMs,
+ new Timer());
+ }
+
+ @VisibleForTesting
+ InstrumentedWriteLock(String name, Log logger,
+ ReentrantReadWriteLock readWriteLock,
+ long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+ super(name, logger, readWriteLock.writeLock(), minLoggingGapMs,
+ lockWarningThresholdMs, clock);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java
new file mode 100644
index 0000000..de68fe4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedLock.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+ static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+ @Rule public TestName name = new TestName();
+
+ /**
+ * Test exclusive access of the lock.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testMultipleThread() throws Exception {
+ String testname = name.getMethodName();
+ final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+ lock.lock();
+ try {
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Test the correctness with try-with-resource syntax.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testTryWithResourceSyntax() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
+ final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
+ @Override
+ public void lock() {
+ super.lock();
+ lockThread.set(Thread.currentThread());
+ }
+ @Override
+ public void unlock() {
+ super.unlock();
+ lockThread.set(null);
+ }
+ };
+ AutoCloseableLock acl = new AutoCloseableLock(lock);
+ try (AutoCloseable localLock = acl.acquire()) {
+ assertEquals(acl, localLock);
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertNotEquals(Thread.currentThread(), lockThread.get());
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ assertEquals(Thread.currentThread(), lockThread.get());
+ }
+ assertNull(lockThread.get());
+ }
+
+ /**
+ * Test the lock logs warning when lock held time is greater than threshold
+ * and not log warning otherwise.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testLockLongHoldingReport() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicLong time = new AtomicLong(0);
+ Timer mclock = new Timer() {
+ @Override
+ public long monotonicNow() {
+ return time.get();
+ }
+ };
+ Lock mlock = mock(Lock.class);
+
+ final AtomicLong wlogged = new AtomicLong(0);
+ final AtomicLong wsuppresed = new AtomicLong(0);
+ InstrumentedLock lock = new InstrumentedLock(
+ testname, LOG, mlock, 2000, 300, mclock) {
+ @Override
+ void logWarning(long lockHeldTime, long suppressed) {
+ wlogged.incrementAndGet();
+ wsuppresed.set(suppressed);
+ }
+ };
+
+ // do not log warning when the lock held time is short
+ lock.lock(); // t = 0
+ time.set(200);
+ lock.unlock(); // t = 200
+ assertEquals(0, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ lock.lock(); // t = 200
+ time.set(700);
+ lock.unlock(); // t = 700
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // despite the lock held time is greater than threshold
+ // suppress the log warning due to the logging gap
+ // (not recorded in wsuppressed until next log message)
+ lock.lock(); // t = 700
+ time.set(1100);
+ lock.unlock(); // t = 1100
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // log a warning message when the lock held time is greater the threshold
+ // and the logging time gap is satisfied. Also should display suppressed
+ // previous warnings.
+ time.set(2400);
+ lock.lock(); // t = 2400
+ time.set(2800);
+ lock.unlock(); // t = 2800
+ assertEquals(2, wlogged.get());
+ assertEquals(1, wsuppresed.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java
new file mode 100644
index 0000000..eeefa88
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestInstrumentedReadWriteLock.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * A test class for InstrumentedReadLock and InstrumentedWriteLock.
+ */
+public class TestInstrumentedReadWriteLock {
+
+ static final Log LOG = LogFactory.getLog(TestInstrumentedReadWriteLock.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ /**
+ * Tests exclusive access of the write lock.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testWriteLock() throws Exception {
+ String testname = name.getMethodName();
+ final ThreadLocal<Boolean> locked = new ThreadLocal<Boolean>();
+ locked.set(Boolean.FALSE);
+ InstrumentedReadWriteLock readWriteLock = new InstrumentedReadWriteLock(
+ true, testname, LOG, 2000, 300);
+ final AutoCloseableLock writeLock = new AutoCloseableLock(
+ readWriteLock.writeLock()) {
+ @Override
+ public AutoCloseableLock acquire() {
+ AutoCloseableLock lock = super.acquire();
+ locked.set(Boolean.TRUE);
+ return lock;
+ }
+
+ @Override
+ public void release() {
+ super.release();
+ locked.set(Boolean.FALSE);
+ }
+ };
+ final AutoCloseableLock readLock = new AutoCloseableLock(
+ readWriteLock.readLock());
+ try (AutoCloseableLock lock = writeLock.acquire()) {
+ Thread competingWriteThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(writeLock.tryLock());
+ }
+ };
+ competingWriteThread.start();
+ competingWriteThread.join();
+ Thread competingReadThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(readLock.tryLock());
+ };
+ };
+ competingReadThread.start();
+ competingReadThread.join();
+ }
+ assertFalse(locked.get());
+ locked.remove();
+ }
+
+ /**
+ * Tests the read lock.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testReadLock() throws Exception {
+ String testname = name.getMethodName();
+ InstrumentedReadWriteLock readWriteLock = new InstrumentedReadWriteLock(
+ true, testname, LOG, 2000, 300);
+ final AutoCloseableLock readLock = new AutoCloseableLock(
+ readWriteLock.readLock());
+ final AutoCloseableLock writeLock = new AutoCloseableLock(
+ readWriteLock.writeLock());
+ try (AutoCloseableLock lock = readLock.acquire()) {
+ Thread competingReadThread = new Thread() {
+ @Override
+ public void run() {
+ assertTrue(readLock.tryLock());
+ readLock.release();
+ }
+ };
+ competingReadThread.start();
+ competingReadThread.join();
+ Thread competingWriteThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(writeLock.tryLock());
+ }
+ };
+ competingWriteThread.start();
+ competingWriteThread.join();
+ }
+ }
+
+ /**
+ * Tests the warning when the read lock is held longer than threshold.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testReadLockLongHoldingReport() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicLong time = new AtomicLong(0);
+ Timer mclock = new Timer() {
+ @Override
+ public long monotonicNow() {
+ return time.get();
+ }
+ };
+
+ final AtomicLong wlogged = new AtomicLong(0);
+ final AtomicLong wsuppresed = new AtomicLong(0);
+ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
+ InstrumentedReadLock readLock = new InstrumentedReadLock(testname, LOG,
+ readWriteLock, 2000, 300, mclock) {
+ @Override
+ protected void logWarning(long lockHeldTime, long suppressed) {
+ wlogged.incrementAndGet();
+ wsuppresed.set(suppressed);
+ }
+ };
+
+ readLock.lock(); // t = 0
+ time.set(100);
+ readLock.unlock(); // t = 100
+ assertEquals(0, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ readLock.lock(); // t = 100
+ time.set(500);
+ readLock.unlock(); // t = 500
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // the suppress counting is only changed when
+ // log is needed in the test
+ readLock.lock(); // t = 500
+ time.set(900);
+ readLock.unlock(); // t = 900
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ readLock.lock(); // t = 900
+ time.set(3000);
+ readLock.unlock(); // t = 3000
+ assertEquals(2, wlogged.get());
+ assertEquals(1, wsuppresed.get());
+ }
+
+ /**
+ * Tests the warning when the write lock is held longer than threshold.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testWriteLockLongHoldingReport() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicLong time = new AtomicLong(0);
+ Timer mclock = new Timer() {
+ @Override
+ public long monotonicNow() {
+ return time.get();
+ }
+ };
+
+ final AtomicLong wlogged = new AtomicLong(0);
+ final AtomicLong wsuppresed = new AtomicLong(0);
+ ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(true);
+ InstrumentedWriteLock writeLock = new InstrumentedWriteLock(testname, LOG,
+ readWriteLock, 2000, 300, mclock) {
+ @Override
+ protected void logWarning(long lockHeldTime, long suppressed) {
+ wlogged.incrementAndGet();
+ wsuppresed.set(suppressed);
+ }
+ };
+
+ writeLock.lock(); // t = 0
+ time.set(100);
+ writeLock.unlock(); // t = 100
+ assertEquals(0, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ writeLock.lock(); // t = 100
+ time.set(500);
+ writeLock.unlock(); // t = 500
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // the suppress counting is only changed when
+ // log is needed in the test
+ writeLock.lock(); // t = 500
+ time.set(900);
+ writeLock.unlock(); // t = 900
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ writeLock.lock(); // t = 900
+ time.set(3000);
+ writeLock.unlock(); // t = 3000
+ assertEquals(2, wlogged.get());
+ assertEquals(1, wsuppresed.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
deleted file mode 100644
index 6279e95..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/InstrumentedLock.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.commons.logging.Log;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.Timer;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This is a debugging class that can be used by callers to track
- * whether a specifc lock is being held for too long and periodically
- * log a warning and stack trace, if so.
- *
- * The logged warnings are throttled so that logs are not spammed.
- *
- * A new instance of InstrumentedLock can be created for each object
- * that needs to be instrumented.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class InstrumentedLock implements Lock {
-
- private final Lock lock;
- private final Log logger;
- private final String name;
- private final Timer clock;
-
- /** Minimum gap between two lock warnings. */
- private final long minLoggingGap;
- /** Threshold for detecting long lock held time. */
- private final long lockWarningThreshold;
-
- // Tracking counters for lock statistics.
- private volatile long lockAcquireTimestamp;
- private final AtomicLong lastLogTimestamp;
- private final AtomicLong warningsSuppressed = new AtomicLong(0);
-
- /**
- * Create a instrumented lock instance which logs a warning message
- * when lock held time is above given threshold.
- *
- * @param name the identifier of the lock object
- * @param logger this class does not have its own logger, will log to the
- * given logger instead
- * @param minLoggingGapMs the minimum time gap between two log messages,
- * this is to avoid spamming to many logs
- * @param lockWarningThresholdMs the time threshold to view lock held
- * time as being "too long"
- */
- public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
- long lockWarningThresholdMs) {
- this(name, logger, new ReentrantLock(),
- minLoggingGapMs, lockWarningThresholdMs);
- }
-
- public InstrumentedLock(String name, Log logger, Lock lock,
- long minLoggingGapMs, long lockWarningThresholdMs) {
- this(name, logger, lock,
- minLoggingGapMs, lockWarningThresholdMs, new Timer());
- }
-
- @VisibleForTesting
- InstrumentedLock(String name, Log logger, Lock lock,
- long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
- this.name = name;
- this.lock = lock;
- this.clock = clock;
- this.logger = logger;
- minLoggingGap = minLoggingGapMs;
- lockWarningThreshold = lockWarningThresholdMs;
- lastLogTimestamp = new AtomicLong(
- clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
- }
-
- @Override
- public void lock() {
- lock.lock();
- lockAcquireTimestamp = clock.monotonicNow();
- }
-
- @Override
- public void lockInterruptibly() throws InterruptedException {
- lock.lockInterruptibly();
- lockAcquireTimestamp = clock.monotonicNow();
- }
-
- @Override
- public boolean tryLock() {
- if (lock.tryLock()) {
- lockAcquireTimestamp = clock.monotonicNow();
- return true;
- }
- return false;
- }
-
- @Override
- public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
- if (lock.tryLock(time, unit)) {
- lockAcquireTimestamp = clock.monotonicNow();
- return true;
- }
- return false;
- }
-
- @Override
- public void unlock() {
- long localLockReleaseTime = clock.monotonicNow();
- long localLockAcquireTime = lockAcquireTimestamp;
- lock.unlock();
- check(localLockAcquireTime, localLockReleaseTime);
- }
-
- @Override
- public Condition newCondition() {
- return lock.newCondition();
- }
-
- @VisibleForTesting
- void logWarning(long lockHeldTime, long suppressed) {
- logger.warn(String.format("Lock held time above threshold: " +
- "lock identifier: %s " +
- "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
- "The stack trace is: %s" ,
- name, lockHeldTime, suppressed,
- StringUtils.getStackTrace(Thread.currentThread())));
- }
-
- /**
- * Log a warning if the lock was held for too long.
- *
- * Should be invoked by the caller immediately AFTER releasing the lock.
- *
- * @param acquireTime - timestamp just after acquiring the lock.
- * @param releaseTime - timestamp just before releasing the lock.
- */
- private void check(long acquireTime, long releaseTime) {
- if (!logger.isWarnEnabled()) {
- return;
- }
-
- final long lockHeldTime = releaseTime - acquireTime;
- if (lockWarningThreshold - lockHeldTime < 0) {
- long now;
- long localLastLogTs;
- do {
- now = clock.monotonicNow();
- localLastLogTs = lastLogTimestamp.get();
- long deltaSinceLastLog = now - localLastLogTs;
- // check should print log or not
- if (deltaSinceLastLog - minLoggingGap < 0) {
- warningsSuppressed.incrementAndGet();
- return;
- }
- } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
- long suppressed = warningsSuppressed.getAndSet(0);
- logWarning(lockHeldTime, suppressed);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index a18c677..1322e24 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -64,7 +64,6 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
-import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -121,6 +120,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
+import org.apache.hadoop.util.InstrumentedLock;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/25f4327f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
deleted file mode 100644
index 1d1a42b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs;
-
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-
-import org.apache.hadoop.util.AutoCloseableLock;
-import org.apache.hadoop.util.Timer;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import static org.mockito.Mockito.*;
-import static org.junit.Assert.*;
-
-/**
- * A test class for InstrumentedLock.
- */
-public class TestInstrumentedLock {
-
- static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
-
- @Rule public TestName name = new TestName();
-
- /**
- * Test exclusive access of the lock.
- * @throws Exception
- */
- @Test(timeout=10000)
- public void testMultipleThread() throws Exception {
- String testname = name.getMethodName();
- final InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
- lock.lock();
- try {
- Thread competingThread = new Thread() {
- @Override
- public void run() {
- assertFalse(lock.tryLock());
- }
- };
- competingThread.start();
- competingThread.join();
- } finally {
- lock.unlock();
- }
- }
-
- /**
- * Test the correctness with try-with-resource syntax.
- * @throws Exception
- */
- @Test(timeout=10000)
- public void testTryWithResourceSyntax() throws Exception {
- String testname = name.getMethodName();
- final AtomicReference<Thread> lockThread = new AtomicReference<>(null);
- final Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
- @Override
- public void lock() {
- super.lock();
- lockThread.set(Thread.currentThread());
- }
- @Override
- public void unlock() {
- super.unlock();
- lockThread.set(null);
- }
- };
- AutoCloseableLock acl = new AutoCloseableLock(lock);
- try (AutoCloseable localLock = acl.acquire()) {
- assertEquals(acl, localLock);
- Thread competingThread = new Thread() {
- @Override
- public void run() {
- assertNotEquals(Thread.currentThread(), lockThread.get());
- assertFalse(lock.tryLock());
- }
- };
- competingThread.start();
- competingThread.join();
- assertEquals(Thread.currentThread(), lockThread.get());
- }
- assertNull(lockThread.get());
- }
-
- /**
- * Test the lock logs warning when lock held time is greater than threshold
- * and not log warning otherwise.
- * @throws Exception
- */
- @Test(timeout=10000)
- public void testLockLongHoldingReport() throws Exception {
- String testname = name.getMethodName();
- final AtomicLong time = new AtomicLong(0);
- Timer mclock = new Timer() {
- @Override
- public long monotonicNow() {
- return time.get();
- }
- };
- Lock mlock = mock(Lock.class);
-
- final AtomicLong wlogged = new AtomicLong(0);
- final AtomicLong wsuppresed = new AtomicLong(0);
- InstrumentedLock lock = new InstrumentedLock(
- testname, LOG, mlock, 2000, 300, mclock) {
- @Override
- void logWarning(long lockHeldTime, long suppressed) {
- wlogged.incrementAndGet();
- wsuppresed.set(suppressed);
- }
- };
-
- // do not log warning when the lock held time is short
- lock.lock(); // t = 0
- time.set(200);
- lock.unlock(); // t = 200
- assertEquals(0, wlogged.get());
- assertEquals(0, wsuppresed.get());
-
- lock.lock(); // t = 200
- time.set(700);
- lock.unlock(); // t = 700
- assertEquals(1, wlogged.get());
- assertEquals(0, wsuppresed.get());
-
- // despite the lock held time is greater than threshold
- // suppress the log warning due to the logging gap
- // (not recorded in wsuppressed until next log message)
- lock.lock(); // t = 700
- time.set(1100);
- lock.unlock(); // t = 1100
- assertEquals(1, wlogged.get());
- assertEquals(0, wsuppresed.get());
-
- // log a warning message when the lock held time is greater the threshold
- // and the logging time gap is satisfied. Also should display suppressed
- // previous warnings.
- time.set(2400);
- lock.lock(); // t = 2400
- time.set(2800);
- lock.unlock(); // t = 2800
- assertEquals(2, wlogged.get());
- assertEquals(1, wsuppresed.get());
- }
-
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org