You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by am...@apache.org on 2014/10/30 10:07:29 UTC

svn commit: r1635436 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: Oak.java plugins/index/AsyncIndexUpdate.java

Author: amitj
Date: Thu Oct 30 09:07:28 2014
New Revision: 1635436

URL: http://svn.apache.org/r1635436
Log:
OAK-2230: Execution Stats for async indexing
Remove custom executor from ExecutionStats and schedule execution updates for time series through AsyncIndexStats in Oak class.
Made access private for Execution stats and not resetting consolidated stats.

Modified:
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
    jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java?rev=1635436&r1=1635435&r2=1635436&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/Oak.java Thu Oct 30 09:07:28 2014
@@ -528,6 +528,9 @@ public class Oak {
             regs.add(scheduleWithFixedDelay(whiteboard, task, 5, true));
             regs.add(registerMBean(whiteboard, IndexStatsMBean.class,
                     task.getIndexStats(), IndexStatsMBean.TYPE, name));
+            // Register AsyncIndexStats for execution stats update
+            regs.add(
+                scheduleWithFixedDelay(whiteboard, task.getIndexStats(), 1, false));
 
             PropertyIndexAsyncReindex asyncPI = new PropertyIndexAsyncReindex(
                     new AsyncIndexUpdate(IndexConstants.ASYNC_REINDEX_VALUE,
@@ -629,4 +632,4 @@ public class Oak {
         return createContentSession().getLatestRoot();
     }
 
-}
\ No newline at end of file
+}

Modified: jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1635436&r1=1635435&r2=1635436&view=diff
==============================================================================
--- jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java (original)
+++ jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java Thu Oct 30 09:07:28 2014
@@ -28,9 +28,7 @@ import static org.apache.jackrabbit.oak.
 import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
+
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -394,7 +392,6 @@ public class AsyncIndexUpdate implements
                     reindexedDefinitions.clear();
                 }
                 postAsyncRunStatsStatus(indexStats);
-                indexStats.resetConsolidatedExecutionStats();
             }
             mergeWithConcurrencyCheck(builder, beforeCheckpoint, callback.lease);
         } finally {
@@ -447,7 +444,7 @@ public class AsyncIndexUpdate implements
         return indexStats.getStatus() == STATUS_DONE;
     }
 
-    final class AsyncIndexStats implements IndexStatsMBean {
+    final class AsyncIndexStats implements IndexStatsMBean, Runnable {
 
         private String start = "";
         private String done = "";
@@ -458,8 +455,8 @@ public class AsyncIndexUpdate implements
 
         private volatile boolean isPaused;
         private volatile long updates;
-        private Stopwatch watch = Stopwatch.createUnstarted();
-        private ExecutionStats execStats = new ExecutionStats();
+        private final Stopwatch watch = Stopwatch.createUnstarted();
+        private final ExecutionStats execStats = new ExecutionStats();
 
         public void start(String now) {
             status = STATUS_RUNNING;
@@ -589,39 +586,28 @@ public class AsyncIndexUpdate implements
                     + " ,tempCheckpoints=" + tempCps + " ]";
         }
 
-        class ExecutionStats {
-            final ScheduledExecutorService executor;
-            final TimeSeriesRecorder execCounter;
-            final TimeSeriesRecorder execTimer;
+        @Override
+        public void run() {
+            execStats.recordTick();
+        }
+
+        private class ExecutionStats {
+            private final TimeSeriesRecorder execCounter;
+            private final TimeSeriesRecorder execTimer;
 
             /**
              * Captures consolidated execution stats since last reset
              */
-            final AtomicLong consolidatedExecTime = new AtomicLong();
-            final AtomicInteger consolidatedExecRuns = new AtomicInteger();
-            final AtomicLong consolidatedNodes = new AtomicLong();
-            final String[] names = {"Executions", "Execution Time", "Nodes"};
-            CompositeType consolidatedType;
-
-            ExecutionStats() {
-                executor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-                    @Override
-                    public Thread newThread(@Nonnull Runnable r) {
-                        Thread thread = new Thread(r, "AsyncIndexStats-ExecutionsStats");
-                        thread.setDaemon(true);
-                        return thread;
-                    }
-                });
+            private final AtomicLong consolidatedExecTime = new AtomicLong();
+            private final AtomicInteger consolidatedExecRuns = new AtomicInteger();
+            private final AtomicLong consolidatedNodes = new AtomicLong();
+            private final String[] names = {"Executions", "Execution Time", "Nodes"};
+            private CompositeType consolidatedType;
+
+            private ExecutionStats() {
                 execCounter = new TimeSeriesRecorder(true);
                 execTimer = new TimeSeriesRecorder(true);
 
-                executor.scheduleAtFixedRate(new Runnable() {
-                    public void run() {
-                        execCounter.recordOneSecond();
-                        execTimer.recordOneSecond();
-                    }
-                }, 1, 1, TimeUnit.SECONDS);
-
                 try {
                     consolidatedType = new CompositeType("ConsolidatedStats",
                         "Consolidated stats", names,
@@ -632,26 +618,26 @@ public class AsyncIndexUpdate implements
                 }
             }
 
-            void incrementCounter() {
+            private void incrementCounter() {
                 execCounter.getCounter().incrementAndGet();
                 consolidatedExecRuns.incrementAndGet();
             }
 
-            void recordExecution(long time, long updates) {
+            private void recordExecution(long time, long updates) {
                 execTimer.getCounter().addAndGet(time);
                 consolidatedExecTime.addAndGet(time);
                 consolidatedNodes.addAndGet(updates);
             }
 
-            public CompositeData getExecutionCount() {
+            private CompositeData getExecutionCount() {
                 return TimeSeriesStatsUtil.asCompositeData(execCounter, "ExecutionCount");
             }
 
-            public CompositeData getExecutionTime() {
+            private CompositeData getExecutionTime() {
                 return TimeSeriesStatsUtil.asCompositeData(execTimer, "ExecutionTime");
             }
 
-            public CompositeData getConsolidatedStats() {
+            private CompositeData getConsolidatedStats() {
                 try {
                     Long[] values = new Long[]{consolidatedExecRuns.longValue(),
                         consolidatedExecTime.longValue(), consolidatedNodes.longValue()};
@@ -662,11 +648,16 @@ public class AsyncIndexUpdate implements
                 }
             }
 
-            public void resetConsolidatedStats() {
+            private void resetConsolidatedStats() {
                 consolidatedExecRuns.set(0);
                 consolidatedExecTime.set(0);
                 consolidatedNodes.set(0);
             }
+
+            private void recordTick() {
+                execCounter.recordOneSecond();
+                execTimer.recordOneSecond();
+            }
         }
     }