You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2017/11/04 00:30:14 UTC

cassandra git commit: Allow only one concurrent call to StatusLogger

Repository: cassandra
Updated Branches:
  refs/heads/trunk 260846685 -> 5b09543f6


Allow only one concurrent call to StatusLogger

patch by mszczygiel; reviewed by jasobrown for CASSANDRA-12182


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/5b09543f
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/5b09543f
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/5b09543f

Branch: refs/heads/trunk
Commit: 5b09543f64eafb1344f7814a80b73d312d5bbc37
Parents: 2608466
Author: mszczygiel <my...@gmail.com>
Authored: Tue Oct 31 22:46:56 2017 +0100
Committer: Jason Brown <ja...@gmail.com>
Committed: Fri Nov 3 17:28:05 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../apache/cassandra/utils/StatusLogger.java    |  25 ++-
 .../cassandra/utils/StatusLoggerTest.java       | 160 +++++++++++++++++++
 3 files changed, 185 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b09543f/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 71f4b1d..e214177 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Allow only one concurrent call to StatusLogger (CASSANDRA-12182)
  * Refactoring to specialised functional interfaces (CASSANDRA-13982)
  * Speculative retry should allow more friendly params (CASSANDRA-13876)
  * Throw exception if we send/receive repair messages to incompatible nodes (CASSANDRA-13944)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b09543f/src/java/org/apache/cassandra/utils/StatusLogger.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/StatusLogger.java b/src/java/org/apache/cassandra/utils/StatusLogger.java
index c33190b..9f9d869 100644
--- a/src/java/org/apache/cassandra/utils/StatusLogger.java
+++ b/src/java/org/apache/cassandra/utils/StatusLogger.java
@@ -19,11 +19,13 @@ package org.apache.cassandra.utils;
 
 import java.lang.management.ManagementFactory;
 import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.management.*;
 
 import org.apache.cassandra.cache.*;
 
 import org.apache.cassandra.metrics.ThreadPoolMetrics;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,10 +39,31 @@ import org.apache.cassandra.service.CacheService;
 public class StatusLogger
 {
     private static final Logger logger = LoggerFactory.getLogger(StatusLogger.class);
-
+    private static final ReentrantLock busyMonitor = new ReentrantLock();
 
     public static void log()
     {
+        // avoid logging more than once at the same time. throw away any attempts to log concurrently, as it would be
+        // confusing and noisy for operators - and don't bother logging again, immediately as it'll just be the same data
+        if (busyMonitor.tryLock())
+        {
+            try
+            {
+                logStatus();
+            }
+            finally
+            {
+                busyMonitor.unlock();
+            }
+        }
+        else
+        {
+            logger.trace("StatusLogger is busy");
+        }
+    }
+
+    private static void logStatus()
+    {
         MBeanServer server = ManagementFactory.getPlatformMBeanServer();
 
         // everything from o.a.c.concurrent

http://git-wip-us.apache.org/repos/asf/cassandra/blob/5b09543f/test/unit/org/apache/cassandra/utils/StatusLoggerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/utils/StatusLoggerTest.java b/test/unit/org/apache/cassandra/utils/StatusLoggerTest.java
new file mode 100644
index 0000000..878e6e8
--- /dev/null
+++ b/test/unit/org/apache/cassandra/utils/StatusLoggerTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.utils;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Range;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.AppenderBase;
+import org.apache.cassandra.cql3.CQLTester;
+
+import static com.google.common.collect.Lists.newArrayList;
+import static java.util.stream.Collectors.groupingBy;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class StatusLoggerTest extends CQLTester
+{
+    private static final Logger log = LoggerFactory.getLogger(StatusLoggerTest.class);
+
+    @Test
+    public void testStatusLoggerPrintsStatusOnlyOnceWhenInvokedConcurrently() throws Exception
+    {
+        ch.qos.logback.classic.Logger logger = (ch.qos.logback.classic.Logger) LoggerFactory.getLogger(StatusLogger.class);
+        InMemoryAppender inMemoryAppender = new InMemoryAppender();
+        logger.addAppender(inMemoryAppender);
+        logger.setLevel(Level.TRACE);
+        try
+        {
+            submitTwoLogRequestsConcurrently();
+            verifyOnlySingleStatusWasAppendedConcurrently(inMemoryAppender.events);
+        }
+        finally
+        {
+            assertTrue("Could not remove in memory appender", logger.detachAppender(inMemoryAppender));
+        }
+    }
+
+    private void submitTwoLogRequestsConcurrently() throws InterruptedException
+    {
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        executorService.submit(StatusLogger::log);
+        executorService.submit(StatusLogger::log);
+        executorService.shutdown();
+        executorService.awaitTermination(1, TimeUnit.SECONDS);
+    }
+
+    private void verifyOnlySingleStatusWasAppendedConcurrently(List<ILoggingEvent> events)
+    {
+        Map<String, List<ILoggingEvent>> eventsByThread = events.stream().collect(groupingBy(ILoggingEvent::getThreadName));
+        List<String> threadNames = newArrayList(eventsByThread.keySet());
+
+        assertEquals("Expected events from 2 threads only", 2, threadNames.size());
+
+        List<ILoggingEvent> firstThreadEvents = eventsByThread.get(threadNames.get(0));
+        List<ILoggingEvent> secondThreadEvents = eventsByThread.get(threadNames.get(1));
+
+        assertTrue("Expected at least one event from the first thread", firstThreadEvents.size() >= 1);
+        assertTrue("Expected at least one event from the second thread", secondThreadEvents.size() >= 1);
+
+        if (areDisjunctive(firstThreadEvents, secondThreadEvents))
+        {
+            log.debug("Event time ranges are disjunctive - log invocations were made one after another");
+        }
+        else
+        {
+            verifyStatusWasPrintedAndBusyEventOccured(firstThreadEvents, secondThreadEvents);
+        }
+    }
+
+    private boolean areDisjunctive(List<ILoggingEvent> firstThreadEvents, List<ILoggingEvent> secondThreadEvents)
+    {
+        Range<Long> firstThreadTimeRange = timestampsRange(firstThreadEvents);
+        Range<Long> secondThreadTimeRange = timestampsRange(secondThreadEvents);
+        boolean connected = firstThreadTimeRange.isConnected(secondThreadTimeRange);
+        boolean disjunctive = !connected || firstThreadTimeRange.intersection(secondThreadTimeRange).isEmpty();
+        log.debug("Time ranges {}, {}, disjunctive={}", firstThreadTimeRange, secondThreadTimeRange, disjunctive);
+        return disjunctive;
+    }
+
+    private Range<Long> timestampsRange(List<ILoggingEvent> events)
+    {
+        List<Long> timestamps = events.stream().map(ILoggingEvent::getTimeStamp).collect(Collectors.toList());
+        Long min = timestamps.stream().min(Comparator.naturalOrder()).get();
+        Long max = timestamps.stream().max(Comparator.naturalOrder()).get();
+        // It's open on one side to cover a case when second status starts printing at the same timestamp that previous one was finished
+        return Range.closedOpen(min, max);
+    }
+
+    private void verifyStatusWasPrintedAndBusyEventOccured(List<ILoggingEvent> firstThreadEvents, List<ILoggingEvent> secondThreadEvents)
+    {
+        if (firstThreadEvents.size() > 1 && secondThreadEvents.size() > 1)
+        {
+            log.error("Both event lists contain more than one entry. First = {}, Second = {}", firstThreadEvents, secondThreadEvents);
+            fail("More that one status log was appended concurrently");
+        }
+        else if (firstThreadEvents.size() <= 1 && secondThreadEvents.size() <= 1)
+        {
+            log.error("No status log was recorded. First = {}, Second = {}", firstThreadEvents, secondThreadEvents);
+            fail("Status log was not appended");
+        }
+        else
+        {
+            log.info("Checking if logger was busy. First = {}, Second = {}", firstThreadEvents, secondThreadEvents);
+            assertTrue("One 'logger busy' entry was expected",
+                       isLoggerBusyTheOnlyEvent(firstThreadEvents) || isLoggerBusyTheOnlyEvent(secondThreadEvents));
+        }
+    }
+
+    private boolean isLoggerBusyTheOnlyEvent(List<ILoggingEvent> events)
+    {
+        return events.size() == 1 &&
+               events.get(0).getMessage().equals("StatusLogger is busy") &&
+               events.get(0).getLevel() == Level.TRACE;
+    }
+
+    private static class InMemoryAppender extends AppenderBase<ILoggingEvent>
+    {
+        private final List<ILoggingEvent> events = newArrayList();
+
+        private InMemoryAppender()
+        {
+            start();
+        }
+
+        @Override
+        protected synchronized void append(ILoggingEvent event)
+        {
+            events.add(event);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@cassandra.apache.org
For additional commands, e-mail: commits-help@cassandra.apache.org