You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by ad...@apache.org on 2018/04/04 13:58:18 UTC

svn commit: r1828345 - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/ test/java/org/apache/jackrabbit/oak/segment/

Author: adulceanu
Date: Wed Apr  4 13:58:18 2018
New Revision: 1828345

URL: http://svn.apache.org/viewvc?rev=1828345&view=rev
Log:
OAK-7384 - SegmentNodeStoreStats should expose stats for previous minute per thread group

Modified:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java?rev=1828345&r1=1828344&r2=1828345&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/CommitsTracker.java Wed Apr  4 13:58:18 2018
@@ -19,6 +19,9 @@
 
 package org.apache.jackrabbit.oak.segment;
 
+import static java.util.concurrent.TimeUnit.MINUTES;
+
+import java.io.Closeable;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +29,7 @@ import java.util.concurrent.ConcurrentMa
 import java.util.stream.Stream;
 
 import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import org.apache.jackrabbit.oak.segment.file.Scheduler;
 
 /**
  * A simple tracker for the source of commits (writes) in
@@ -36,19 +40,28 @@ import com.googlecode.concurrentlinkedha
  * currently waiting on the commit semaphore
  * </ul>
  * 
- * This class delegates thread-safety to its underlying state variables. 
+ * This class delegates thread-safety to its underlying state variables.
  */
-class CommitsTracker {
-    private volatile boolean collectStackTraces;
-
+class CommitsTracker implements Closeable {
+    private final boolean collectStackTraces;
+    private final String[] threadGroups;
     private final ConcurrentMap<String, String> queuedWritersMap;
-    private final ConcurrentMap<String, Long> commitsCountMap;
+    private final ConcurrentMap<String, Long> commitsCountPerThreadGroup;
+    private final ConcurrentMap<String, Long> commitsCountOtherThreads;
+    private final ConcurrentMap<String, Long> commitsCountPerThreadGroupLastMinute;
+    private final Scheduler commitsTrackerScheduler = new Scheduler("CommitsTracker background tasks");
 
-    CommitsTracker(int commitsCountMapMaxSize, boolean collectStackTraces) {
+    CommitsTracker(String[] threadGroups, int otherWritersLimit, boolean collectStackTraces) {
+        this.threadGroups = threadGroups;
         this.collectStackTraces = collectStackTraces;
-        this.commitsCountMap = new ConcurrentLinkedHashMap.Builder<String, Long>()
-                .maximumWeightedCapacity(commitsCountMapMaxSize).build();
+        this.commitsCountPerThreadGroup = new ConcurrentHashMap<>();
+        this.commitsCountPerThreadGroupLastMinute = new ConcurrentHashMap<>();
+        this.commitsCountOtherThreads = new ConcurrentLinkedHashMap.Builder<String, Long>()
+                .maximumWeightedCapacity(otherWritersLimit).build();
         this.queuedWritersMap = new ConcurrentHashMap<>();
+
+        commitsTrackerScheduler.scheduleWithFixedDelay("TarMK commits tracker stats resetter", 1, MINUTES,
+                this::resetStatistics);
     }
 
     public void trackQueuedCommitOf(Thread t) {
@@ -67,19 +80,54 @@ class CommitsTracker {
     }
 
     public void trackExecutedCommitOf(Thread t) {
-        commitsCountMap.compute(t.getName(), (w, v) -> v == null ? 1 : v + 1);
+        String group = findGroupFor(t);
+
+        if (group.equals("other")) {
+            commitsCountOtherThreads.compute(t.getName(), (w, v) -> v == null ? 1 : v + 1);
+        }
+
+        commitsCountPerThreadGroup.compute(group, (w, v) -> v == null ? 1 : v + 1);
+    }
+
+    private String findGroupFor(Thread t) {
+        if (threadGroups == null) {
+            return "other";
+        }
+        
+        for (String group : threadGroups) {
+            if (t.getName().matches(group)) {
+                return group;
+            }
+        }
+
+        return "other";
     }
 
-    public void setCollectStackTraces(boolean flag) {
-        this.collectStackTraces = flag;
+    private void resetStatistics() {
+        commitsCountPerThreadGroupLastMinute.clear();
+        commitsCountPerThreadGroupLastMinute.putAll(commitsCountPerThreadGroup);
+        commitsCountPerThreadGroup.clear();
+        commitsCountOtherThreads.clear();
     }
-    
+
+    @Override
+    public void close() {
+        commitsTrackerScheduler.close();
+    }
+
     public Map<String, String> getQueuedWritersMap() {
         return new HashMap<>(queuedWritersMap);
     }
 
-    public Map<String, Long> getCommitsCountMap() {
-        return new HashMap<>(commitsCountMap);
+    public Map<String, Long> getCommitsCountPerGroupLastMinute() {
+        return new HashMap<>(commitsCountPerThreadGroupLastMinute);
     }
 
+    public Map<String, Long> getCommitsCountOthers() {
+        return new HashMap<>(commitsCountOtherThreads);
+    }
+    
+    Map<String, Long> getCommitsCountPerGroup() {
+        return new HashMap<>(commitsCountPerThreadGroup);
+    }
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java?rev=1828345&r1=1828344&r2=1828345&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStats.java Wed Apr  4 13:58:18 2018
@@ -45,7 +45,7 @@ import org.apache.jackrabbit.oak.stats.T
 
 public class SegmentNodeStoreStats implements SegmentNodeStoreStatsMBean, SegmentNodeStoreMonitor {
     private static final boolean DEFAULT_COLLECT_STACK_TRACES = true;
-    private static final int DEFAULT_COMMITS_COUNT_MAP_SIZE = 20;
+    private static final int DEFAULT_OTHER_WRITERS_LIMIT = 20;
 
     public static final String COMMITS_COUNT = "COMMITS_COUNT";
     public static final String COMMIT_QUEUE_SIZE = "COMMIT_QUEUE_SIZE";
@@ -60,12 +60,13 @@ public class SegmentNodeStoreStats imple
     
     private volatile CommitsTracker commitsTracker;
     private boolean collectStackTraces = DEFAULT_COLLECT_STACK_TRACES;
-    private int commitsCountMapMaxSize = DEFAULT_COMMITS_COUNT_MAP_SIZE;
+    private int otherWritersLimit = DEFAULT_OTHER_WRITERS_LIMIT;
+    private String[] writerGroups;
     
     public SegmentNodeStoreStats(StatisticsProvider statisticsProvider) {
         this.statisticsProvider = statisticsProvider;
         
-        this.commitsTracker = new CommitsTracker(commitsCountMapMaxSize, collectStackTraces);
+        this.commitsTracker = new CommitsTracker(writerGroups, otherWritersLimit, collectStackTraces);
         this.commitsCount = statisticsProvider.getMeter(COMMITS_COUNT, StatsOptions.DEFAULT);
         this.commitQueueSize = statisticsProvider.getCounterStats(COMMIT_QUEUE_SIZE, StatsOptions.DEFAULT);
         this.commitTime = statisticsProvider.getTimer(COMMIT_TIME, StatsOptions.DEFAULT);
@@ -117,24 +118,35 @@ public class SegmentNodeStoreStats imple
     }
 
     @Override
-    public TabularData getCommitsCountPerWriter() throws OpenDataException {
-        CompositeType commitsPerWriterRowType = new CompositeType("commitsPerWriter", "commitsPerWriter",
-                new String[] { "count", "writerName" }, new String[] { "count", "writerName" },
+    public TabularData getCommitsCountPerWriterGroupLastMinute() throws OpenDataException {
+        return createTabularDataFromCountMap(commitsTracker.getCommitsCountPerGroupLastMinute(), "commitsPerWriterGroup",
+                "writerGroup");
+    }
+    
+    @Override
+    public TabularData getCommitsCountForOtherWriters() throws OpenDataException {
+        return createTabularDataFromCountMap(commitsTracker.getCommitsCountOthers(), "commitsPerWriter",
+                "writerName");
+    }
+    
+    private TabularData createTabularDataFromCountMap(Map<String, Long> commitsCountMap, String typeName,
+            String writerDescription) throws OpenDataException {
+        CompositeType commitsPerWriterRowType = new CompositeType(typeName, typeName,
+                new String[] { "count", writerDescription }, new String[] { "count", writerDescription },
                 new OpenType[] { SimpleType.LONG, SimpleType.STRING });
 
-        TabularDataSupport tabularData = new TabularDataSupport(new TabularType("commitsPerWriter",
-                "Most active writers", commitsPerWriterRowType, new String[] { "writerName" }));
+        TabularDataSupport tabularData = new TabularDataSupport(new TabularType(typeName, "Most active writers",
+                commitsPerWriterRowType, new String[] { writerDescription }));
 
-        Map<String, Long> commitsCountMap = commitsTracker.getCommitsCountMap();
         if (commitsCountMap.isEmpty()) {
             commitsCountMap.put("N/A", 0L);
         }
-        
+
         commitsCountMap.entrySet().stream()
                 .sorted(Comparator.<Entry<String, Long>> comparingLong(Entry::getValue).reversed()).map(e -> {
                     Map<String, Object> m = new HashMap<>();
                     m.put("count", e.getValue());
-                    m.put("writerName", e.getKey());
+                    m.put(writerDescription, e.getKey());
                     return m;
                 }).map(d -> mapToCompositeData(commitsPerWriterRowType, d)).forEach(tabularData::put);
 
@@ -168,7 +180,8 @@ public class SegmentNodeStoreStats imple
     @Override
     public void setCollectStackTraces(boolean flag) {
         this.collectStackTraces = flag;
-        commitsTracker.setCollectStackTraces(flag);
+        commitsTracker.close();
+        commitsTracker = new CommitsTracker(writerGroups, otherWritersLimit, collectStackTraces);
     }
     
     @Override
@@ -176,13 +189,28 @@ public class SegmentNodeStoreStats imple
         return collectStackTraces;
     }
     
-    public int getCommitsCountMapMaxSize() {
-        return commitsCountMapMaxSize;
+    @Override
+    public int getNumberOfOtherWritersToDetail() {
+        return otherWritersLimit;
     }
 
-    public void setCommitsCountMapMaxSize(int commitsCountMapMaxSize) {
-        this.commitsCountMapMaxSize = commitsCountMapMaxSize;
-        commitsTracker = new CommitsTracker(commitsCountMapMaxSize, collectStackTraces);
+    @Override
+    public void setNumberOfOtherWritersToDetail(int otherWritersLimit) {
+        this.otherWritersLimit = otherWritersLimit;
+        commitsTracker.close();
+        commitsTracker = new CommitsTracker(writerGroups, otherWritersLimit, collectStackTraces);
+    }
+    
+    @Override
+    public String[] getWriterGroupsForLastMinuteCounts() {
+        return writerGroups;
+    }
+
+    @Override
+    public void setWriterGroupsForLastMinuteCounts(String[] writerGroups) {
+        this.writerGroups = writerGroups;
+        commitsTracker.close();
+        commitsTracker = new CommitsTracker(writerGroups, otherWritersLimit, collectStackTraces);
     }
 
     private TimeSeries getTimeSeries(String name) {

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java?rev=1828345&r1=1828344&r2=1828345&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentNodeStoreStatsMBean.java Wed Apr  4 13:58:18 2018
@@ -47,10 +47,18 @@ public interface SegmentNodeStoreStatsMB
     CompositeData getQueuingTimes();
     
     /**
-     * @return tabular data of the form <commits,writer>
+     * @return tabular data of the form <commits,writerGroup> collected 
+     *         <b>in the last minute</b>
      * @throws OpenDataException if data is not available
      */
-    TabularData getCommitsCountPerWriter() throws OpenDataException;
+    TabularData getCommitsCountPerWriterGroupLastMinute() throws OpenDataException;
+    
+    /**
+     * @return tabular data of the form <commits,writer> for writers 
+     *         not included in groups
+     * @throws OpenDataException if data is not available
+     */
+    TabularData getCommitsCountForOtherWriters() throws OpenDataException;
     
     /**
      * @return tabular data of the form <writer,writerDetails> for each writer
@@ -61,7 +69,7 @@ public interface SegmentNodeStoreStatsMB
     
     /**
      * Turns on/off, depending on the value of {@code flag}, the collection of 
-     * stack traces for each writer thread.
+     * stack traces for each writer.
      * @param flag {@code boolean} indicating whether to collect or not
      */
     void setCollectStackTraces(boolean flag);
@@ -72,15 +80,29 @@ public interface SegmentNodeStoreStatsMB
     boolean isCollectStackTraces();
     
     /**
-     * Modifies the maximum number of writing threads to be recorded.
+     * Modifies the maximum number of writers outside already defined
+     * groups to be recorded.
      * Changing the default value will reset the overall collection process.
      * 
-     * @param commitsCountMapSize the new size
+     * @param otherWritersLimit the new size
+     */
+    void setNumberOfOtherWritersToDetail(int otherWritersLimit);
+    
+    /**
+     * @return maximum number of writers outside already defined
+     * groups to be recorded
      */
-    void setCommitsCountMapMaxSize(int commitsCountMapMaxSize);
+    int getNumberOfOtherWritersToDetail();
     
     /**
-     * @return maximum number of writing threads to be recorded
+     * @return current groups used for grouping writers.
+     */
+    String[] getWriterGroupsForLastMinuteCounts();
+
+    /**
+     * Modifies the groups used for grouping writers.
+     * Changing the default value will reset the overall collection process.
+     * @param writerGroups groups defined by regexps
      */
-    int getCommitsCountMapMaxSize();
+    void setWriterGroupsForLastMinuteCounts(String[] writerGroups);
 }

Modified: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java?rev=1828345&r1=1828344&r2=1828345&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java (original)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/CommitsTrackerTest.java Wed Apr  4 13:58:18 2018
@@ -21,11 +21,13 @@ package org.apache.jackrabbit.oak.segmen
 
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.jackrabbit.oak.commons.concurrent.ExecutorCloser;
 import org.junit.Test;
@@ -52,8 +54,8 @@ public class CommitsTrackerTest {
     }
 
     @Test
-    public void testSizeConstraints() throws InterruptedException {
-        CommitsTracker commitsTracker = new CommitsTracker(10, false);
+    public void testCommitsCountOthers() throws InterruptedException {
+        CommitsTracker commitsTracker = new CommitsTracker(new String[] {}, 10, false);
         ExecutorService executorService = newFixedThreadPool(30);
         final CountDownLatch addLatch = new CountDownLatch(25);
 
@@ -78,11 +80,11 @@ public class CommitsTrackerTest {
             }
 
             addLatch.await();
-            Map<String, Long> commitsCountMap = commitsTracker.getCommitsCountMap();
+            Map<String, Long> commitsCountOthersMap = commitsTracker.getCommitsCountOthers();
             Map<String, String> queuedWritersMap = commitsTracker.getQueuedWritersMap();
 
-            assertTrue(commitsCountMap.size() >= 10);
-            assertTrue(commitsCountMap.size() < 20);
+            assertTrue(commitsCountOthersMap.size() >= 10);
+            assertTrue(commitsCountOthersMap.size() < 20);
             assertEquals(5, queuedWritersMap.size());
 
             CountDownLatch removeLatch = new CountDownLatch(5);
@@ -94,6 +96,42 @@ public class CommitsTrackerTest {
             queuedWritersMap = commitsTracker.getQueuedWritersMap();
             assertEquals(0, queuedWritersMap.size());
         } finally {
+            commitsTracker.close();
+            new ExecutorCloser(executorService).close();
+        }
+    }
+
+    @Test
+    public void testCommitsCountPerGroup() throws InterruptedException {
+        String[] groups = new String[] { "Thread-1.*", "Thread-2.*", "Thread-3.*" };
+        CommitsTracker commitsTracker = new CommitsTracker(groups, 10, false);
+        ExecutorService executorService = newFixedThreadPool(30);
+        AtomicInteger counter = new AtomicInteger(10);
+        final CountDownLatch latch = new CountDownLatch(30);
+
+        Runnable executedCommitTask = () -> {
+            Thread.currentThread().setName("Thread-" + counter.getAndIncrement());
+            commitsTracker.trackExecutedCommitOf(Thread.currentThread());
+            latch.countDown();
+        };
+
+        try {
+            for (int i = 0; i < 30; i++) {
+                executorService.submit(executedCommitTask);
+            }
+            
+            latch.await();
+            
+            Map<String, Long> commitsCountPerGroup = commitsTracker.getCommitsCountPerGroup();
+            assertEquals(3, commitsCountPerGroup.size());
+            
+            for (String group : groups) {
+                Long groupCount = commitsCountPerGroup.get(group);
+                assertNotNull(groupCount);
+                assertEquals(10, (long) groupCount);
+            }
+        } finally {
+            commitsTracker.close();
             new ExecutorCloser(executorService).close();
         }
     }