You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-dev@jackrabbit.apache.org by Marcel Reutegger <mr...@adobe.com> on 2014/10/29 17:40:21 UTC

Re: svn commit: r1635058 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: api/jmx/IndexStatsMBean.java plugins/index/AsyncIndexUpdate.java stats/RepositoryStats.java stats/TimeSeriesStatsUtil.java

Hi,

I think this commit causes OOME when the build is run with the node store
fixtures for MongoDB and RDB.

The heap dump shows many DocumentNodeStore instances, which cannot be
garbage collected because the ScheduledExecutorService instance introduce
with this change is still running and references the store.

Would it be an option to use the ScheduledExecutorService from
org.apache.jackrabbit.oak.Oak?

Regards
 Marcel

On 29/10/14 07:17, "amitj@apache.org" <am...@apache.org> wrote:

>Author: amitj
>Date: Wed Oct 29 06:17:00 2014
>New Revision: 1635058
>
>URL: http://svn.apache.org/r1635058
>Log:
>OAK-2230: Execution Stats for async indexing
>Added resettable consolidated execution stats
>TimeSeries for trends
>
>Added:
>    
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/TimeSeriesStatsUtil.java   (with props)
>Modified:
>    
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
>jmx/IndexStatsMBean.java
>    
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdate.java
>    
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/RepositoryStats.java
>
>Modified: 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
>jmx/IndexStatsMBean.java
>URL: 
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1635058&r1=16350
>57&r2=1635058&view=diff
>==========================================================================
>====
>--- 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
>jmx/IndexStatsMBean.java (original)
>+++ 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
>jmx/IndexStatsMBean.java Wed Oct 29 06:17:00 2014
>@@ -17,6 +17,8 @@
> 
> package org.apache.jackrabbit.oak.api.jmx;
> 
>+import javax.management.openmbean.CompositeData;
>+
> public interface IndexStatsMBean {
> 
>     String TYPE = "IndexStats";
>@@ -114,4 +116,28 @@ public interface IndexStatsMBean {
>      */
>     String getTemporaryCheckpoints();
> 
>+    /**
>+     * Returns the number of executions as a {@link
>org.apache.jackrabbit.api.stats.TimeSeries}.
>+     *
>+     * @return the execution count time series
>+     */
>+    CompositeData getExecutionCount();
>+
>+    /**
>+     * Returns the execution time as a {@link
>org.apache.jackrabbit.api.stats.TimeSeries}.
>+     *
>+     * @return the execution times time series
>+     */
>+    CompositeData getExecutionTime();
>+
>+    /**
>+     * Returns the consolidated execution stats since last reset
>+     * @return consolidated execution stats
>+     */
>+    CompositeData getConsolidatedExecutionStats();
>+
>+    /**
>+     * Resets the consolidated stats.
>+     */
>+    void resetConsolidatedExecutionStats();
> }
>
>Modified: 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdate.java
>URL: 
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1635058&r
>1=1635057&r2=1635058&view=diff
>==========================================================================
>====
>--- 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdate.java (original)
>+++ 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
>ins/index/AsyncIndexUpdate.java Wed Oct 29 06:17:00 2014
>@@ -28,10 +28,22 @@ import static org.apache.jackrabbit.oak.
> import java.util.Calendar;
> import java.util.HashSet;
> import java.util.Set;
>+import java.util.concurrent.Executors;
>+import java.util.concurrent.ScheduledExecutorService;
>+import java.util.concurrent.ThreadFactory;
> import java.util.concurrent.TimeUnit;
>+import java.util.concurrent.atomic.AtomicInteger;
>+import java.util.concurrent.atomic.AtomicLong;
> 
> import javax.annotation.Nonnull;
>+import javax.management.openmbean.CompositeData;
>+import javax.management.openmbean.CompositeDataSupport;
>+import javax.management.openmbean.CompositeType;
>+import javax.management.openmbean.OpenDataException;
>+import javax.management.openmbean.OpenType;
>+import javax.management.openmbean.SimpleType;
> 
>+import com.google.common.base.Stopwatch;
> import org.apache.jackrabbit.oak.api.CommitFailedException;
> import org.apache.jackrabbit.oak.api.PropertyState;
> import org.apache.jackrabbit.oak.api.Type;
>@@ -51,6 +63,8 @@ import org.apache.jackrabbit.oak.spi.sta
> import org.apache.jackrabbit.oak.spi.state.NodeState;
> import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
> import org.apache.jackrabbit.oak.spi.state.NodeStore;
>+import org.apache.jackrabbit.oak.stats.TimeSeriesStatsUtil;
>+import org.apache.jackrabbit.stats.TimeSeriesRecorder;
> import org.apache.jackrabbit.util.ISO8601;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>@@ -380,6 +394,7 @@ public class AsyncIndexUpdate implements
>                     reindexedDefinitions.clear();
>                 }
>                 postAsyncRunStatsStatus(indexStats);
>+                indexStats.resetConsolidatedExecutionStats();
>             }
>             mergeWithConcurrencyCheck(builder, beforeCheckpoint,
>callback.lease);
>         } finally {
>@@ -443,17 +458,29 @@ public class AsyncIndexUpdate implements
> 
>         private volatile boolean isPaused;
>         private volatile long updates;
>+        private Stopwatch watch = Stopwatch.createUnstarted();
>+        private ExecutionStats execStats = new ExecutionStats();
> 
>         public void start(String now) {
>             status = STATUS_RUNNING;
>             start = now;
>             done = "";
>+
>+            if (watch.isRunning()) {
>+                watch.reset();
>+            }
>+            watch.start();
>         }
> 
>         public void done(String now) {
>             status = STATUS_DONE;
>-            start = "";
>             done = now;
>+            if (watch.isRunning()) {
>+                watch.stop();
>+            }
>+            execStats.incrementCounter();
>+            
>execStats.recordExecution(watch.elapsed(TimeUnit.MILLISECONDS), updates);
>+            watch.reset();
>         }
> 
>         @Override
>@@ -534,6 +561,26 @@ public class AsyncIndexUpdate implements
>         }
> 
>         @Override
>+        public CompositeData getExecutionCount() {
>+            return execStats.getExecutionCount();
>+        }
>+
>+        @Override
>+        public CompositeData getExecutionTime() {
>+            return execStats.getExecutionTime();
>+        }
>+
>+        @Override
>+        public CompositeData getConsolidatedExecutionStats() {
>+            return execStats.getConsolidatedStats();
>+        }
>+
>+        @Override
>+        public void resetConsolidatedExecutionStats() {
>+            execStats.resetConsolidatedStats();
>+        }
>+
>+        @Override
>         public String toString() {
>             return "AsyncIndexStats [start=" + start + ", done=" + done
>                     + ", status=" + status + ", paused=" + isPaused
>@@ -541,6 +588,86 @@ public class AsyncIndexUpdate implements
>                     + referenceCp + ", processedCheckpoint=" +
>processedCp
>                     + " ,tempCheckpoints=" + tempCps + " ]";
>         }
>+
>+        class ExecutionStats {
>+            final ScheduledExecutorService executor;
>+            final TimeSeriesRecorder execCounter;
>+            final TimeSeriesRecorder execTimer;
>+
>+            /**
>+             * Captures consolidated execution stats since last reset
>+             */
>+            final AtomicLong consolidatedExecTime = new AtomicLong();
>+            final AtomicInteger consolidatedExecRuns = new
>AtomicInteger();
>+            final AtomicLong consolidatedNodes = new AtomicLong();
>+            final String[] names = {"Executions", "Execution Time",
>"Nodes"};
>+            CompositeType consolidatedType;
>+
>+            ExecutionStats() {
>+                executor =
>Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
>+                    @Override
>+                    public Thread newThread(@Nonnull Runnable r) {
>+                        Thread thread = new Thread(r,
>"AsyncIndexStats-ExecutionsStats");
>+                        thread.setDaemon(true);
>+                        return thread;
>+                    }
>+                });
>+                execCounter = new TimeSeriesRecorder(true);
>+                execTimer = new TimeSeriesRecorder(true);
>+
>+                executor.scheduleAtFixedRate(new Runnable() {
>+                    public void run() {
>+                        execCounter.recordOneSecond();
>+                        execTimer.recordOneSecond();
>+                    }
>+                }, 1, 1, TimeUnit.SECONDS);
>+
>+                try {
>+                    consolidatedType = new
>CompositeType("ConsolidatedStats",
>+                        "Consolidated stats", names,
>+                        names,
>+                        new OpenType[] {SimpleType.LONG,
>SimpleType.LONG, SimpleType.LONG});
>+                } catch (OpenDataException e) {
>+                    log.warn("Error in creating CompositeType for
>consolidated stats", e);
>+                }
>+            }
>+
>+            void incrementCounter() {
>+                execCounter.getCounter().incrementAndGet();
>+                consolidatedExecRuns.incrementAndGet();
>+            }
>+
>+            void recordExecution(long time, long updates) {
>+                execTimer.getCounter().addAndGet(time);
>+                consolidatedExecTime.addAndGet(time);
>+                consolidatedNodes.addAndGet(updates);
>+            }
>+
>+            public CompositeData getExecutionCount() {
>+                return TimeSeriesStatsUtil.asCompositeData(execCounter,
>"ExecutionCount");
>+            }
>+
>+            public CompositeData getExecutionTime() {
>+                return TimeSeriesStatsUtil.asCompositeData(execTimer,
>"ExecutionTime");
>+            }
>+
>+            public CompositeData getConsolidatedStats() {
>+                try {
>+                    Long[] values = new
>Long[]{consolidatedExecRuns.longValue(),
>+                        consolidatedExecTime.longValue(),
>consolidatedNodes.longValue()};
>+                    return new CompositeDataSupport(consolidatedType,
>names, values);
>+                } catch (Exception e) {
>+                    log.error("Error retrieving consolidated stats", e);
>+                    return null;
>+                }
>+            }
>+
>+            public void resetConsolidatedStats() {
>+                consolidatedExecRuns.set(0);
>+                consolidatedExecTime.set(0);
>+                consolidatedNodes.set(0);
>+            }
>+        }
>     }
> 
>     /**
>
>Modified: 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/RepositoryStats.java
>URL: 
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/stats/RepositoryStats.java?rev=1635058&r1=1635057
>&r2=1635058&view=diff
>==========================================================================
>====
>--- 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/RepositoryStats.java (original)
>+++ 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/RepositoryStats.java Wed Oct 29 06:17:00 2014
>@@ -35,13 +35,7 @@ import static org.apache.jackrabbit.api.
> import static 
>org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_CO
>UNTER;
> import static 
>org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_DU
>RATION;
> 
>-import javax.management.openmbean.ArrayType;
> import javax.management.openmbean.CompositeData;
>-import javax.management.openmbean.CompositeDataSupport;
>-import javax.management.openmbean.CompositeType;
>-import javax.management.openmbean.OpenDataException;
>-import javax.management.openmbean.OpenType;
>-import javax.management.openmbean.SimpleType;
> 
> import org.apache.jackrabbit.api.stats.RepositoryStatistics;
> import org.apache.jackrabbit.api.stats.TimeSeries;
>@@ -132,39 +126,16 @@ public class RepositoryStats implements
> 
>     @Override
>     public CompositeData getObservationQueueMaxLength() {
>-        return asCompositeData(maxQueueLength, "maximal length of
>observation queue");
>+        return TimeSeriesStatsUtil
>+            .asCompositeData(maxQueueLength, "maximal length of
>observation queue");
>     }
> 
>-    public static final String[] ITEM_NAMES = new String[] {
>-            "per second", "per minute", "per hour", "per week"};
>-
>     private TimeSeries getTimeSeries(Type type) {
>         return repoStats.getTimeSeries(type);
>     }
> 
>     private CompositeData asCompositeData(Type type) {
>-        return asCompositeData(getTimeSeries(type), type.name());
>-    }
>-
>-    private static CompositeData asCompositeData(TimeSeries timeSeries,
>String name) {
>-        try {
>-            long[][] values = new long[][] {
>-                timeSeries.getValuePerSecond(),
>-                timeSeries.getValuePerMinute(),
>-                timeSeries.getValuePerHour(),
>-                timeSeries.getValuePerWeek()};
>-            return new CompositeDataSupport(getCompositeType(name),
>ITEM_NAMES, values);
>-        } catch (Exception e) {
>-            LOG.error("Error creating CompositeData instance from
>TimeSeries", e);
>-            return null;
>-        }
>-    }
>-
>-    private static CompositeType getCompositeType(String name) throws
>OpenDataException {
>-        ArrayType<int[]> longArrayType = new
>ArrayType<int[]>(SimpleType.LONG, true);
>-        OpenType<?>[] itemTypes = new OpenType[] {
>-                longArrayType, longArrayType, longArrayType,
>longArrayType};
>-        return new CompositeType(name, name + " time series",
>ITEM_NAMES, ITEM_NAMES, itemTypes);
>+        return TimeSeriesStatsUtil.asCompositeData(getTimeSeries(type),
>type.name());
>     }
> 
> }
>
>Added: 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/TimeSeriesStatsUtil.java
>URL: 
>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
>rg/apache/jackrabbit/oak/stats/TimeSeriesStatsUtil.java?rev=1635058&view=a
>uto
>==========================================================================
>====
>--- 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/TimeSeriesStatsUtil.java (added)
>+++ 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/TimeSeriesStatsUtil.java Wed Oct 29 06:17:00 2014
>@@ -0,0 +1,58 @@
>+/*
>+ * Licensed to the Apache Software Foundation (ASF) under one
>+ * or more contributor license agreements.  See the NOTICE file
>+ * distributed with this work for additional information
>+ * regarding copyright ownership.  The ASF licenses this file
>+ * to you under the Apache License, Version 2.0 (the
>+ * "License"); you may not use this file except in compliance
>+ * with the License.  You may obtain a copy of the License at
>+ *
>+ *   http://www.apache.org/licenses/LICENSE-2.0
>+ *
>+ * Unless required by applicable law or agreed to in writing,
>+ * software distributed under the License is distributed on an
>+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>+ * KIND, either express or implied.  See the License for the
>+ * specific language governing permissions and limitations
>+ * under the License.
>+ */
>+package org.apache.jackrabbit.oak.stats;
>+
>+import org.apache.jackrabbit.api.stats.TimeSeries;
>+import org.slf4j.Logger;
>+import org.slf4j.LoggerFactory;
>+
>+import javax.management.openmbean.ArrayType;
>+import javax.management.openmbean.CompositeData;
>+import javax.management.openmbean.CompositeDataSupport;
>+import javax.management.openmbean.CompositeType;
>+import javax.management.openmbean.OpenDataException;
>+import javax.management.openmbean.OpenType;
>+import javax.management.openmbean.SimpleType;
>+
>+/**
>+ * Utility class for retrieving {@link
>javax.management.openmbean.CompositeData} for
>+ * {@link org.apache.jackrabbit.api.stats.TimeSeries}.
>+ */
>+public class TimeSeriesStatsUtil {
>+    public static final String[] ITEM_NAMES = new String[] {"per
>second", "per minute", "per hour", "per week"};
>+
>+    private static final Logger LOG =
>LoggerFactory.getLogger(TimeSeriesStatsUtil.class);
>+
>+    public static CompositeData asCompositeData(TimeSeries timeSeries,
>String name) {
>+        try {
>+            long[][] values = new long[][]
>{timeSeries.getValuePerSecond(), timeSeries.getValuePerMinute(),
>+                timeSeries.getValuePerHour(),
>timeSeries.getValuePerWeek()};
>+            return new CompositeDataSupport(getCompositeType(name),
>ITEM_NAMES, values);
>+        } catch (Exception e) {
>+            LOG.error("Error creating CompositeData instance from
>TimeSeries", e);
>+            return null;
>+        }
>+    }
>+
>+    static CompositeType getCompositeType(String name) throws
>OpenDataException {
>+        ArrayType<int[]> longArrayType = new
>ArrayType<int[]>(SimpleType.LONG, true);
>+        OpenType<?>[] itemTypes = new OpenType[] {longArrayType,
>longArrayType, longArrayType, longArrayType};
>+        return new CompositeType(name, name + " time series",
>ITEM_NAMES, ITEM_NAMES, itemTypes);
>+    }
>+}
>
>Propchange: 
>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
>s/TimeSeriesStatsUtil.java
>--------------------------------------------------------------------------
>----
>    svn:eol-style = native
>
>


Re: svn commit: r1635058 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: api/jmx/IndexStatsMBean.java plugins/index/AsyncIndexUpdate.java stats/RepositoryStats.java stats/TimeSeriesStatsUtil.java

Posted by Marcel Reutegger <mr...@adobe.com>.
looks good now. thanks for the fix.

regards
 marcel

On 30/10/14 10:12, "Amit Jain" <am...@ieee.org> wrote:

>Hi,
>
>>>Would it be an option to use the ScheduledExecutorService from
>org.apache.jackrabbit.oak.Oak?
>
>In revision 1635436, I removed the custom executor service and used
>whiteboard instance to schedule in org.apache.jackrabbit.oak.Oak.
>
>Thanks
>Amit
>
>On Wed, Oct 29, 2014 at 10:10 PM, Marcel Reutegger <mr...@adobe.com>
>wrote:
>
>> Hi,
>>
>> I think this commit causes OOME when the build is run with the node
>>store
>> fixtures for MongoDB and RDB.
>>
>> The heap dump shows many DocumentNodeStore instances, which cannot be
>> garbage collected because the ScheduledExecutorService instance
>>introduce
>> with this change is still running and references the store.
>>
>> Would it be an option to use the ScheduledExecutorService from
>> org.apache.jackrabbit.oak.Oak?
>>
>> Regards
>>  Marcel
>>
>> On 29/10/14 07:17, "amitj@apache.org" <am...@apache.org> wrote:
>>
>> >Author: amitj
>> >Date: Wed Oct 29 06:17:00 2014
>> >New Revision: 1635058
>> >
>> >URL: http://svn.apache.org/r1635058
>> >Log:
>> >OAK-2230: Execution Stats for async indexing
>> >Added resettable consolidated execution stats
>> >TimeSeries for trends
>> >
>> >Added:
>> >
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java   (with props)
>> >Modified:
>> >
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/ap
>>>i/
>> >jmx/IndexStatsMBean.java
>> >
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/pl
>>>ug
>> >ins/index/AsyncIndexUpdate.java
>> >
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/RepositoryStats.java
>> >
>> >Modified:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/ap
>>>i/
>> >jmx/IndexStatsMBean.java
>> >URL:
>> >
>> 
>>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/
>>o
>> 
>>>rg/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1635058&r1=163
>>>50
>> >57&r2=1635058&view=diff
>> 
>>>========================================================================
>>>==
>> >====
>> >---
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/ap
>>>i/
>> >jmx/IndexStatsMBean.java (original)
>> >+++
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/ap
>>>i/
>> >jmx/IndexStatsMBean.java Wed Oct 29 06:17:00 2014
>> >@@ -17,6 +17,8 @@
>> >
>> > package org.apache.jackrabbit.oak.api.jmx;
>> >
>> >+import javax.management.openmbean.CompositeData;
>> >+
>> > public interface IndexStatsMBean {
>> >
>> >     String TYPE = "IndexStats";
>> >@@ -114,4 +116,28 @@ public interface IndexStatsMBean {
>> >      */
>> >     String getTemporaryCheckpoints();
>> >
>> >+    /**
>> >+     * Returns the number of executions as a {@link
>> >org.apache.jackrabbit.api.stats.TimeSeries}.
>> >+     *
>> >+     * @return the execution count time series
>> >+     */
>> >+    CompositeData getExecutionCount();
>> >+
>> >+    /**
>> >+     * Returns the execution time as a {@link
>> >org.apache.jackrabbit.api.stats.TimeSeries}.
>> >+     *
>> >+     * @return the execution times time series
>> >+     */
>> >+    CompositeData getExecutionTime();
>> >+
>> >+    /**
>> >+     * Returns the consolidated execution stats since last reset
>> >+     * @return consolidated execution stats
>> >+     */
>> >+    CompositeData getConsolidatedExecutionStats();
>> >+
>> >+    /**
>> >+     * Resets the consolidated stats.
>> >+     */
>> >+    void resetConsolidatedExecutionStats();
>> > }
>> >
>> >Modified:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/pl
>>>ug
>> >ins/index/AsyncIndexUpdate.java
>> >URL:
>> >
>> 
>>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/
>>o
>> 
>>>rg/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1635058
>>>&r
>> >1=1635057&r2=1635058&view=diff
>> 
>>>========================================================================
>>>==
>> >====
>> >---
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/pl
>>>ug
>> >ins/index/AsyncIndexUpdate.java (original)
>> >+++
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/pl
>>>ug
>> >ins/index/AsyncIndexUpdate.java Wed Oct 29 06:17:00 2014
>> >@@ -28,10 +28,22 @@ import static org.apache.jackrabbit.oak.
>> > import java.util.Calendar;
>> > import java.util.HashSet;
>> > import java.util.Set;
>> >+import java.util.concurrent.Executors;
>> >+import java.util.concurrent.ScheduledExecutorService;
>> >+import java.util.concurrent.ThreadFactory;
>> > import java.util.concurrent.TimeUnit;
>> >+import java.util.concurrent.atomic.AtomicInteger;
>> >+import java.util.concurrent.atomic.AtomicLong;
>> >
>> > import javax.annotation.Nonnull;
>> >+import javax.management.openmbean.CompositeData;
>> >+import javax.management.openmbean.CompositeDataSupport;
>> >+import javax.management.openmbean.CompositeType;
>> >+import javax.management.openmbean.OpenDataException;
>> >+import javax.management.openmbean.OpenType;
>> >+import javax.management.openmbean.SimpleType;
>> >
>> >+import com.google.common.base.Stopwatch;
>> > import org.apache.jackrabbit.oak.api.CommitFailedException;
>> > import org.apache.jackrabbit.oak.api.PropertyState;
>> > import org.apache.jackrabbit.oak.api.Type;
>> >@@ -51,6 +63,8 @@ import org.apache.jackrabbit.oak.spi.sta
>> > import org.apache.jackrabbit.oak.spi.state.NodeState;
>> > import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
>> > import org.apache.jackrabbit.oak.spi.state.NodeStore;
>> >+import org.apache.jackrabbit.oak.stats.TimeSeriesStatsUtil;
>> >+import org.apache.jackrabbit.stats.TimeSeriesRecorder;
>> > import org.apache.jackrabbit.util.ISO8601;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >@@ -380,6 +394,7 @@ public class AsyncIndexUpdate implements
>> >                     reindexedDefinitions.clear();
>> >                 }
>> >                 postAsyncRunStatsStatus(indexStats);
>> >+                indexStats.resetConsolidatedExecutionStats();
>> >             }
>> >             mergeWithConcurrencyCheck(builder, beforeCheckpoint,
>> >callback.lease);
>> >         } finally {
>> >@@ -443,17 +458,29 @@ public class AsyncIndexUpdate implements
>> >
>> >         private volatile boolean isPaused;
>> >         private volatile long updates;
>> >+        private Stopwatch watch = Stopwatch.createUnstarted();
>> >+        private ExecutionStats execStats = new ExecutionStats();
>> >
>> >         public void start(String now) {
>> >             status = STATUS_RUNNING;
>> >             start = now;
>> >             done = "";
>> >+
>> >+            if (watch.isRunning()) {
>> >+                watch.reset();
>> >+            }
>> >+            watch.start();
>> >         }
>> >
>> >         public void done(String now) {
>> >             status = STATUS_DONE;
>> >-            start = "";
>> >             done = now;
>> >+            if (watch.isRunning()) {
>> >+                watch.stop();
>> >+            }
>> >+            execStats.incrementCounter();
>> >+
>> >execStats.recordExecution(watch.elapsed(TimeUnit.MILLISECONDS),
>>updates);
>> >+            watch.reset();
>> >         }
>> >
>> >         @Override
>> >@@ -534,6 +561,26 @@ public class AsyncIndexUpdate implements
>> >         }
>> >
>> >         @Override
>> >+        public CompositeData getExecutionCount() {
>> >+            return execStats.getExecutionCount();
>> >+        }
>> >+
>> >+        @Override
>> >+        public CompositeData getExecutionTime() {
>> >+            return execStats.getExecutionTime();
>> >+        }
>> >+
>> >+        @Override
>> >+        public CompositeData getConsolidatedExecutionStats() {
>> >+            return execStats.getConsolidatedStats();
>> >+        }
>> >+
>> >+        @Override
>> >+        public void resetConsolidatedExecutionStats() {
>> >+            execStats.resetConsolidatedStats();
>> >+        }
>> >+
>> >+        @Override
>> >         public String toString() {
>> >             return "AsyncIndexStats [start=" + start + ", done=" +
>>done
>> >                     + ", status=" + status + ", paused=" + isPaused
>> >@@ -541,6 +588,86 @@ public class AsyncIndexUpdate implements
>> >                     + referenceCp + ", processedCheckpoint=" +
>> >processedCp
>> >                     + " ,tempCheckpoints=" + tempCps + " ]";
>> >         }
>> >+
>> >+        class ExecutionStats {
>> >+            final ScheduledExecutorService executor;
>> >+            final TimeSeriesRecorder execCounter;
>> >+            final TimeSeriesRecorder execTimer;
>> >+
>> >+            /**
>> >+             * Captures consolidated execution stats since last reset
>> >+             */
>> >+            final AtomicLong consolidatedExecTime = new AtomicLong();
>> >+            final AtomicInteger consolidatedExecRuns = new
>> >AtomicInteger();
>> >+            final AtomicLong consolidatedNodes = new AtomicLong();
>> >+            final String[] names = {"Executions", "Execution Time",
>> >"Nodes"};
>> >+            CompositeType consolidatedType;
>> >+
>> >+            ExecutionStats() {
>> >+                executor =
>> >Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
>> >+                    @Override
>> >+                    public Thread newThread(@Nonnull Runnable r) {
>> >+                        Thread thread = new Thread(r,
>> >"AsyncIndexStats-ExecutionsStats");
>> >+                        thread.setDaemon(true);
>> >+                        return thread;
>> >+                    }
>> >+                });
>> >+                execCounter = new TimeSeriesRecorder(true);
>> >+                execTimer = new TimeSeriesRecorder(true);
>> >+
>> >+                executor.scheduleAtFixedRate(new Runnable() {
>> >+                    public void run() {
>> >+                        execCounter.recordOneSecond();
>> >+                        execTimer.recordOneSecond();
>> >+                    }
>> >+                }, 1, 1, TimeUnit.SECONDS);
>> >+
>> >+                try {
>> >+                    consolidatedType = new
>> >CompositeType("ConsolidatedStats",
>> >+                        "Consolidated stats", names,
>> >+                        names,
>> >+                        new OpenType[] {SimpleType.LONG,
>> >SimpleType.LONG, SimpleType.LONG});
>> >+                } catch (OpenDataException e) {
>> >+                    log.warn("Error in creating CompositeType for
>> >consolidated stats", e);
>> >+                }
>> >+            }
>> >+
>> >+            void incrementCounter() {
>> >+                execCounter.getCounter().incrementAndGet();
>> >+                consolidatedExecRuns.incrementAndGet();
>> >+            }
>> >+
>> >+            void recordExecution(long time, long updates) {
>> >+                execTimer.getCounter().addAndGet(time);
>> >+                consolidatedExecTime.addAndGet(time);
>> >+                consolidatedNodes.addAndGet(updates);
>> >+            }
>> >+
>> >+            public CompositeData getExecutionCount() {
>> >+                return
>>TimeSeriesStatsUtil.asCompositeData(execCounter,
>> >"ExecutionCount");
>> >+            }
>> >+
>> >+            public CompositeData getExecutionTime() {
>> >+                return TimeSeriesStatsUtil.asCompositeData(execTimer,
>> >"ExecutionTime");
>> >+            }
>> >+
>> >+            public CompositeData getConsolidatedStats() {
>> >+                try {
>> >+                    Long[] values = new
>> >Long[]{consolidatedExecRuns.longValue(),
>> >+                        consolidatedExecTime.longValue(),
>> >consolidatedNodes.longValue()};
>> >+                    return new CompositeDataSupport(consolidatedType,
>> >names, values);
>> >+                } catch (Exception e) {
>> >+                    log.error("Error retrieving consolidated stats",
>>e);
>> >+                    return null;
>> >+                }
>> >+            }
>> >+
>> >+            public void resetConsolidatedStats() {
>> >+                consolidatedExecRuns.set(0);
>> >+                consolidatedExecTime.set(0);
>> >+                consolidatedNodes.set(0);
>> >+            }
>> >+        }
>> >     }
>> >
>> >     /**
>> >
>> >Modified:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/RepositoryStats.java
>> >URL:
>> >
>> 
>>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/
>>o
>> 
>>>rg/apache/jackrabbit/oak/stats/RepositoryStats.java?rev=1635058&r1=16350
>>>57
>> >&r2=1635058&view=diff
>> 
>>>========================================================================
>>>==
>> >====
>> >---
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/RepositoryStats.java (original)
>> >+++
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/RepositoryStats.java Wed Oct 29 06:17:00 2014
>> >@@ -35,13 +35,7 @@ import static org.apache.jackrabbit.api.
>> > import static
>> 
>>>org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_
>>>CO
>> >UNTER;
>> > import static
>> 
>>>org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_
>>>DU
>> >RATION;
>> >
>> >-import javax.management.openmbean.ArrayType;
>> > import javax.management.openmbean.CompositeData;
>> >-import javax.management.openmbean.CompositeDataSupport;
>> >-import javax.management.openmbean.CompositeType;
>> >-import javax.management.openmbean.OpenDataException;
>> >-import javax.management.openmbean.OpenType;
>> >-import javax.management.openmbean.SimpleType;
>> >
>> > import org.apache.jackrabbit.api.stats.RepositoryStatistics;
>> > import org.apache.jackrabbit.api.stats.TimeSeries;
>> >@@ -132,39 +126,16 @@ public class RepositoryStats implements
>> >
>> >     @Override
>> >     public CompositeData getObservationQueueMaxLength() {
>> >-        return asCompositeData(maxQueueLength, "maximal length of
>> >observation queue");
>> >+        return TimeSeriesStatsUtil
>> >+            .asCompositeData(maxQueueLength, "maximal length of
>> >observation queue");
>> >     }
>> >
>> >-    public static final String[] ITEM_NAMES = new String[] {
>> >-            "per second", "per minute", "per hour", "per week"};
>> >-
>> >     private TimeSeries getTimeSeries(Type type) {
>> >         return repoStats.getTimeSeries(type);
>> >     }
>> >
>> >     private CompositeData asCompositeData(Type type) {
>> >-        return asCompositeData(getTimeSeries(type), type.name());
>> >-    }
>> >-
>> >-    private static CompositeData asCompositeData(TimeSeries
>>timeSeries,
>> >String name) {
>> >-        try {
>> >-            long[][] values = new long[][] {
>> >-                timeSeries.getValuePerSecond(),
>> >-                timeSeries.getValuePerMinute(),
>> >-                timeSeries.getValuePerHour(),
>> >-                timeSeries.getValuePerWeek()};
>> >-            return new CompositeDataSupport(getCompositeType(name),
>> >ITEM_NAMES, values);
>> >-        } catch (Exception e) {
>> >-            LOG.error("Error creating CompositeData instance from
>> >TimeSeries", e);
>> >-            return null;
>> >-        }
>> >-    }
>> >-
>> >-    private static CompositeType getCompositeType(String name) throws
>> >OpenDataException {
>> >-        ArrayType<int[]> longArrayType = new
>> >ArrayType<int[]>(SimpleType.LONG, true);
>> >-        OpenType<?>[] itemTypes = new OpenType[] {
>> >-                longArrayType, longArrayType, longArrayType,
>> >longArrayType};
>> >-        return new CompositeType(name, name + " time series",
>> >ITEM_NAMES, ITEM_NAMES, itemTypes);
>> >+        return
>>TimeSeriesStatsUtil.asCompositeData(getTimeSeries(type),
>> >type.name());
>> >     }
>> >
>> > }
>> >
>> >Added:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java
>> >URL:
>> >
>> 
>>http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/
>>o
>> 
>>>rg/apache/jackrabbit/oak/stats/TimeSeriesStatsUtil.java?rev=1635058&view
>>>=a
>> >uto
>> 
>>>========================================================================
>>>==
>> >====
>> >---
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java (added)
>> >+++
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java Wed Oct 29 06:17:00 2014
>> >@@ -0,0 +1,58 @@
>> >+/*
>> >+ * Licensed to the Apache Software Foundation (ASF) under one
>> >+ * or more contributor license agreements.  See the NOTICE file
>> >+ * distributed with this work for additional information
>> >+ * regarding copyright ownership.  The ASF licenses this file
>> >+ * to you under the Apache License, Version 2.0 (the
>> >+ * "License"); you may not use this file except in compliance
>> >+ * with the License.  You may obtain a copy of the License at
>> >+ *
>> >+ *   http://www.apache.org/licenses/LICENSE-2.0
>> >+ *
>> >+ * Unless required by applicable law or agreed to in writing,
>> >+ * software distributed under the License is distributed on an
>> >+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
>> >+ * KIND, either express or implied.  See the License for the
>> >+ * specific language governing permissions and limitations
>> >+ * under the License.
>> >+ */
>> >+package org.apache.jackrabbit.oak.stats;
>> >+
>> >+import org.apache.jackrabbit.api.stats.TimeSeries;
>> >+import org.slf4j.Logger;
>> >+import org.slf4j.LoggerFactory;
>> >+
>> >+import javax.management.openmbean.ArrayType;
>> >+import javax.management.openmbean.CompositeData;
>> >+import javax.management.openmbean.CompositeDataSupport;
>> >+import javax.management.openmbean.CompositeType;
>> >+import javax.management.openmbean.OpenDataException;
>> >+import javax.management.openmbean.OpenType;
>> >+import javax.management.openmbean.SimpleType;
>> >+
>> >+/**
>> >+ * Utility class for retrieving {@link
>> >javax.management.openmbean.CompositeData} for
>> >+ * {@link org.apache.jackrabbit.api.stats.TimeSeries}.
>> >+ */
>> >+public class TimeSeriesStatsUtil {
>> >+    public static final String[] ITEM_NAMES = new String[] {"per
>> >second", "per minute", "per hour", "per week"};
>> >+
>> >+    private static final Logger LOG =
>> >LoggerFactory.getLogger(TimeSeriesStatsUtil.class);
>> >+
>> >+    public static CompositeData asCompositeData(TimeSeries timeSeries,
>> >String name) {
>> >+        try {
>> >+            long[][] values = new long[][]
>> >{timeSeries.getValuePerSecond(), timeSeries.getValuePerMinute(),
>> >+                timeSeries.getValuePerHour(),
>> >timeSeries.getValuePerWeek()};
>> >+            return new CompositeDataSupport(getCompositeType(name),
>> >ITEM_NAMES, values);
>> >+        } catch (Exception e) {
>> >+            LOG.error("Error creating CompositeData instance from
>> >TimeSeries", e);
>> >+            return null;
>> >+        }
>> >+    }
>> >+
>> >+    static CompositeType getCompositeType(String name) throws
>> >OpenDataException {
>> >+        ArrayType<int[]> longArrayType = new
>> >ArrayType<int[]>(SimpleType.LONG, true);
>> >+        OpenType<?>[] itemTypes = new OpenType[] {longArrayType,
>> >longArrayType, longArrayType, longArrayType};
>> >+        return new CompositeType(name, name + " time series",
>> >ITEM_NAMES, ITEM_NAMES, itemTypes);
>> >+    }
>> >+}
>> >
>> >Propchange:
>> 
>>>jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/st
>>>at
>> >s/TimeSeriesStatsUtil.java
>> 
>>>------------------------------------------------------------------------
>>>--
>> >----
>> >    svn:eol-style = native
>> >
>> >
>>
>>


Re: svn commit: r1635058 - in /jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak: api/jmx/IndexStatsMBean.java plugins/index/AsyncIndexUpdate.java stats/RepositoryStats.java stats/TimeSeriesStatsUtil.java

Posted by Amit Jain <am...@ieee.org>.
Hi,

>>Would it be an option to use the ScheduledExecutorService from
org.apache.jackrabbit.oak.Oak?

In revision 1635436, I removed the custom executor service and used
whiteboard instance to schedule in org.apache.jackrabbit.oak.Oak.

Thanks
Amit

On Wed, Oct 29, 2014 at 10:10 PM, Marcel Reutegger <mr...@adobe.com>
wrote:

> Hi,
>
> I think this commit causes OOME when the build is run with the node store
> fixtures for MongoDB and RDB.
>
> The heap dump shows many DocumentNodeStore instances, which cannot be
> garbage collected because the ScheduledExecutorService instance introduce
> with this change is still running and references the store.
>
> Would it be an option to use the ScheduledExecutorService from
> org.apache.jackrabbit.oak.Oak?
>
> Regards
>  Marcel
>
> On 29/10/14 07:17, "amitj@apache.org" <am...@apache.org> wrote:
>
> >Author: amitj
> >Date: Wed Oct 29 06:17:00 2014
> >New Revision: 1635058
> >
> >URL: http://svn.apache.org/r1635058
> >Log:
> >OAK-2230: Execution Stats for async indexing
> >Added resettable consolidated execution stats
> >TimeSeries for trends
> >
> >Added:
> >
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java   (with props)
> >Modified:
> >
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
> >jmx/IndexStatsMBean.java
> >
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
> >ins/index/AsyncIndexUpdate.java
> >
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/RepositoryStats.java
> >
> >Modified:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
> >jmx/IndexStatsMBean.java
> >URL:
> >
> http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
> >rg/apache/jackrabbit/oak/api/jmx/IndexStatsMBean.java?rev=1635058&r1=16350
> >57&r2=1635058&view=diff
> >==========================================================================
> >====
> >---
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
> >jmx/IndexStatsMBean.java (original)
> >+++
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/api/
> >jmx/IndexStatsMBean.java Wed Oct 29 06:17:00 2014
> >@@ -17,6 +17,8 @@
> >
> > package org.apache.jackrabbit.oak.api.jmx;
> >
> >+import javax.management.openmbean.CompositeData;
> >+
> > public interface IndexStatsMBean {
> >
> >     String TYPE = "IndexStats";
> >@@ -114,4 +116,28 @@ public interface IndexStatsMBean {
> >      */
> >     String getTemporaryCheckpoints();
> >
> >+    /**
> >+     * Returns the number of executions as a {@link
> >org.apache.jackrabbit.api.stats.TimeSeries}.
> >+     *
> >+     * @return the execution count time series
> >+     */
> >+    CompositeData getExecutionCount();
> >+
> >+    /**
> >+     * Returns the execution time as a {@link
> >org.apache.jackrabbit.api.stats.TimeSeries}.
> >+     *
> >+     * @return the execution times time series
> >+     */
> >+    CompositeData getExecutionTime();
> >+
> >+    /**
> >+     * Returns the consolidated execution stats since last reset
> >+     * @return consolidated execution stats
> >+     */
> >+    CompositeData getConsolidatedExecutionStats();
> >+
> >+    /**
> >+     * Resets the consolidated stats.
> >+     */
> >+    void resetConsolidatedExecutionStats();
> > }
> >
> >Modified:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
> >ins/index/AsyncIndexUpdate.java
> >URL:
> >
> http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
> >rg/apache/jackrabbit/oak/plugins/index/AsyncIndexUpdate.java?rev=1635058&r
> >1=1635057&r2=1635058&view=diff
> >==========================================================================
> >====
> >---
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
> >ins/index/AsyncIndexUpdate.java (original)
> >+++
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/plug
> >ins/index/AsyncIndexUpdate.java Wed Oct 29 06:17:00 2014
> >@@ -28,10 +28,22 @@ import static org.apache.jackrabbit.oak.
> > import java.util.Calendar;
> > import java.util.HashSet;
> > import java.util.Set;
> >+import java.util.concurrent.Executors;
> >+import java.util.concurrent.ScheduledExecutorService;
> >+import java.util.concurrent.ThreadFactory;
> > import java.util.concurrent.TimeUnit;
> >+import java.util.concurrent.atomic.AtomicInteger;
> >+import java.util.concurrent.atomic.AtomicLong;
> >
> > import javax.annotation.Nonnull;
> >+import javax.management.openmbean.CompositeData;
> >+import javax.management.openmbean.CompositeDataSupport;
> >+import javax.management.openmbean.CompositeType;
> >+import javax.management.openmbean.OpenDataException;
> >+import javax.management.openmbean.OpenType;
> >+import javax.management.openmbean.SimpleType;
> >
> >+import com.google.common.base.Stopwatch;
> > import org.apache.jackrabbit.oak.api.CommitFailedException;
> > import org.apache.jackrabbit.oak.api.PropertyState;
> > import org.apache.jackrabbit.oak.api.Type;
> >@@ -51,6 +63,8 @@ import org.apache.jackrabbit.oak.spi.sta
> > import org.apache.jackrabbit.oak.spi.state.NodeState;
> > import org.apache.jackrabbit.oak.spi.state.NodeStateDiff;
> > import org.apache.jackrabbit.oak.spi.state.NodeStore;
> >+import org.apache.jackrabbit.oak.stats.TimeSeriesStatsUtil;
> >+import org.apache.jackrabbit.stats.TimeSeriesRecorder;
> > import org.apache.jackrabbit.util.ISO8601;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >@@ -380,6 +394,7 @@ public class AsyncIndexUpdate implements
> >                     reindexedDefinitions.clear();
> >                 }
> >                 postAsyncRunStatsStatus(indexStats);
> >+                indexStats.resetConsolidatedExecutionStats();
> >             }
> >             mergeWithConcurrencyCheck(builder, beforeCheckpoint,
> >callback.lease);
> >         } finally {
> >@@ -443,17 +458,29 @@ public class AsyncIndexUpdate implements
> >
> >         private volatile boolean isPaused;
> >         private volatile long updates;
> >+        private Stopwatch watch = Stopwatch.createUnstarted();
> >+        private ExecutionStats execStats = new ExecutionStats();
> >
> >         public void start(String now) {
> >             status = STATUS_RUNNING;
> >             start = now;
> >             done = "";
> >+
> >+            if (watch.isRunning()) {
> >+                watch.reset();
> >+            }
> >+            watch.start();
> >         }
> >
> >         public void done(String now) {
> >             status = STATUS_DONE;
> >-            start = "";
> >             done = now;
> >+            if (watch.isRunning()) {
> >+                watch.stop();
> >+            }
> >+            execStats.incrementCounter();
> >+
> >execStats.recordExecution(watch.elapsed(TimeUnit.MILLISECONDS), updates);
> >+            watch.reset();
> >         }
> >
> >         @Override
> >@@ -534,6 +561,26 @@ public class AsyncIndexUpdate implements
> >         }
> >
> >         @Override
> >+        public CompositeData getExecutionCount() {
> >+            return execStats.getExecutionCount();
> >+        }
> >+
> >+        @Override
> >+        public CompositeData getExecutionTime() {
> >+            return execStats.getExecutionTime();
> >+        }
> >+
> >+        @Override
> >+        public CompositeData getConsolidatedExecutionStats() {
> >+            return execStats.getConsolidatedStats();
> >+        }
> >+
> >+        @Override
> >+        public void resetConsolidatedExecutionStats() {
> >+            execStats.resetConsolidatedStats();
> >+        }
> >+
> >+        @Override
> >         public String toString() {
> >             return "AsyncIndexStats [start=" + start + ", done=" + done
> >                     + ", status=" + status + ", paused=" + isPaused
> >@@ -541,6 +588,86 @@ public class AsyncIndexUpdate implements
> >                     + referenceCp + ", processedCheckpoint=" +
> >processedCp
> >                     + " ,tempCheckpoints=" + tempCps + " ]";
> >         }
> >+
> >+        class ExecutionStats {
> >+            final ScheduledExecutorService executor;
> >+            final TimeSeriesRecorder execCounter;
> >+            final TimeSeriesRecorder execTimer;
> >+
> >+            /**
> >+             * Captures consolidated execution stats since last reset
> >+             */
> >+            final AtomicLong consolidatedExecTime = new AtomicLong();
> >+            final AtomicInteger consolidatedExecRuns = new
> >AtomicInteger();
> >+            final AtomicLong consolidatedNodes = new AtomicLong();
> >+            final String[] names = {"Executions", "Execution Time",
> >"Nodes"};
> >+            CompositeType consolidatedType;
> >+
> >+            ExecutionStats() {
> >+                executor =
> >Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
> >+                    @Override
> >+                    public Thread newThread(@Nonnull Runnable r) {
> >+                        Thread thread = new Thread(r,
> >"AsyncIndexStats-ExecutionsStats");
> >+                        thread.setDaemon(true);
> >+                        return thread;
> >+                    }
> >+                });
> >+                execCounter = new TimeSeriesRecorder(true);
> >+                execTimer = new TimeSeriesRecorder(true);
> >+
> >+                executor.scheduleAtFixedRate(new Runnable() {
> >+                    public void run() {
> >+                        execCounter.recordOneSecond();
> >+                        execTimer.recordOneSecond();
> >+                    }
> >+                }, 1, 1, TimeUnit.SECONDS);
> >+
> >+                try {
> >+                    consolidatedType = new
> >CompositeType("ConsolidatedStats",
> >+                        "Consolidated stats", names,
> >+                        names,
> >+                        new OpenType[] {SimpleType.LONG,
> >SimpleType.LONG, SimpleType.LONG});
> >+                } catch (OpenDataException e) {
> >+                    log.warn("Error in creating CompositeType for
> >consolidated stats", e);
> >+                }
> >+            }
> >+
> >+            void incrementCounter() {
> >+                execCounter.getCounter().incrementAndGet();
> >+                consolidatedExecRuns.incrementAndGet();
> >+            }
> >+
> >+            void recordExecution(long time, long updates) {
> >+                execTimer.getCounter().addAndGet(time);
> >+                consolidatedExecTime.addAndGet(time);
> >+                consolidatedNodes.addAndGet(updates);
> >+            }
> >+
> >+            public CompositeData getExecutionCount() {
> >+                return TimeSeriesStatsUtil.asCompositeData(execCounter,
> >"ExecutionCount");
> >+            }
> >+
> >+            public CompositeData getExecutionTime() {
> >+                return TimeSeriesStatsUtil.asCompositeData(execTimer,
> >"ExecutionTime");
> >+            }
> >+
> >+            public CompositeData getConsolidatedStats() {
> >+                try {
> >+                    Long[] values = new
> >Long[]{consolidatedExecRuns.longValue(),
> >+                        consolidatedExecTime.longValue(),
> >consolidatedNodes.longValue()};
> >+                    return new CompositeDataSupport(consolidatedType,
> >names, values);
> >+                } catch (Exception e) {
> >+                    log.error("Error retrieving consolidated stats", e);
> >+                    return null;
> >+                }
> >+            }
> >+
> >+            public void resetConsolidatedStats() {
> >+                consolidatedExecRuns.set(0);
> >+                consolidatedExecTime.set(0);
> >+                consolidatedNodes.set(0);
> >+            }
> >+        }
> >     }
> >
> >     /**
> >
> >Modified:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/RepositoryStats.java
> >URL:
> >
> http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
> >rg/apache/jackrabbit/oak/stats/RepositoryStats.java?rev=1635058&r1=1635057
> >&r2=1635058&view=diff
> >==========================================================================
> >====
> >---
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/RepositoryStats.java (original)
> >+++
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/RepositoryStats.java Wed Oct 29 06:17:00 2014
> >@@ -35,13 +35,7 @@ import static org.apache.jackrabbit.api.
> > import static
> >org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_CO
> >UNTER;
> > import static
> >org.apache.jackrabbit.api.stats.RepositoryStatistics.Type.SESSION_WRITE_DU
> >RATION;
> >
> >-import javax.management.openmbean.ArrayType;
> > import javax.management.openmbean.CompositeData;
> >-import javax.management.openmbean.CompositeDataSupport;
> >-import javax.management.openmbean.CompositeType;
> >-import javax.management.openmbean.OpenDataException;
> >-import javax.management.openmbean.OpenType;
> >-import javax.management.openmbean.SimpleType;
> >
> > import org.apache.jackrabbit.api.stats.RepositoryStatistics;
> > import org.apache.jackrabbit.api.stats.TimeSeries;
> >@@ -132,39 +126,16 @@ public class RepositoryStats implements
> >
> >     @Override
> >     public CompositeData getObservationQueueMaxLength() {
> >-        return asCompositeData(maxQueueLength, "maximal length of
> >observation queue");
> >+        return TimeSeriesStatsUtil
> >+            .asCompositeData(maxQueueLength, "maximal length of
> >observation queue");
> >     }
> >
> >-    public static final String[] ITEM_NAMES = new String[] {
> >-            "per second", "per minute", "per hour", "per week"};
> >-
> >     private TimeSeries getTimeSeries(Type type) {
> >         return repoStats.getTimeSeries(type);
> >     }
> >
> >     private CompositeData asCompositeData(Type type) {
> >-        return asCompositeData(getTimeSeries(type), type.name());
> >-    }
> >-
> >-    private static CompositeData asCompositeData(TimeSeries timeSeries,
> >String name) {
> >-        try {
> >-            long[][] values = new long[][] {
> >-                timeSeries.getValuePerSecond(),
> >-                timeSeries.getValuePerMinute(),
> >-                timeSeries.getValuePerHour(),
> >-                timeSeries.getValuePerWeek()};
> >-            return new CompositeDataSupport(getCompositeType(name),
> >ITEM_NAMES, values);
> >-        } catch (Exception e) {
> >-            LOG.error("Error creating CompositeData instance from
> >TimeSeries", e);
> >-            return null;
> >-        }
> >-    }
> >-
> >-    private static CompositeType getCompositeType(String name) throws
> >OpenDataException {
> >-        ArrayType<int[]> longArrayType = new
> >ArrayType<int[]>(SimpleType.LONG, true);
> >-        OpenType<?>[] itemTypes = new OpenType[] {
> >-                longArrayType, longArrayType, longArrayType,
> >longArrayType};
> >-        return new CompositeType(name, name + " time series",
> >ITEM_NAMES, ITEM_NAMES, itemTypes);
> >+        return TimeSeriesStatsUtil.asCompositeData(getTimeSeries(type),
> >type.name());
> >     }
> >
> > }
> >
> >Added:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java
> >URL:
> >
> http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-core/src/main/java/o
> >rg/apache/jackrabbit/oak/stats/TimeSeriesStatsUtil.java?rev=1635058&view=a
> >uto
> >==========================================================================
> >====
> >---
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java (added)
> >+++
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java Wed Oct 29 06:17:00 2014
> >@@ -0,0 +1,58 @@
> >+/*
> >+ * Licensed to the Apache Software Foundation (ASF) under one
> >+ * or more contributor license agreements.  See the NOTICE file
> >+ * distributed with this work for additional information
> >+ * regarding copyright ownership.  The ASF licenses this file
> >+ * to you under the Apache License, Version 2.0 (the
> >+ * "License"); you may not use this file except in compliance
> >+ * with the License.  You may obtain a copy of the License at
> >+ *
> >+ *   http://www.apache.org/licenses/LICENSE-2.0
> >+ *
> >+ * Unless required by applicable law or agreed to in writing,
> >+ * software distributed under the License is distributed on an
> >+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
> >+ * KIND, either express or implied.  See the License for the
> >+ * specific language governing permissions and limitations
> >+ * under the License.
> >+ */
> >+package org.apache.jackrabbit.oak.stats;
> >+
> >+import org.apache.jackrabbit.api.stats.TimeSeries;
> >+import org.slf4j.Logger;
> >+import org.slf4j.LoggerFactory;
> >+
> >+import javax.management.openmbean.ArrayType;
> >+import javax.management.openmbean.CompositeData;
> >+import javax.management.openmbean.CompositeDataSupport;
> >+import javax.management.openmbean.CompositeType;
> >+import javax.management.openmbean.OpenDataException;
> >+import javax.management.openmbean.OpenType;
> >+import javax.management.openmbean.SimpleType;
> >+
> >+/**
> >+ * Utility class for retrieving {@link
> >javax.management.openmbean.CompositeData} for
> >+ * {@link org.apache.jackrabbit.api.stats.TimeSeries}.
> >+ */
> >+public class TimeSeriesStatsUtil {
> >+    public static final String[] ITEM_NAMES = new String[] {"per
> >second", "per minute", "per hour", "per week"};
> >+
> >+    private static final Logger LOG =
> >LoggerFactory.getLogger(TimeSeriesStatsUtil.class);
> >+
> >+    public static CompositeData asCompositeData(TimeSeries timeSeries,
> >String name) {
> >+        try {
> >+            long[][] values = new long[][]
> >{timeSeries.getValuePerSecond(), timeSeries.getValuePerMinute(),
> >+                timeSeries.getValuePerHour(),
> >timeSeries.getValuePerWeek()};
> >+            return new CompositeDataSupport(getCompositeType(name),
> >ITEM_NAMES, values);
> >+        } catch (Exception e) {
> >+            LOG.error("Error creating CompositeData instance from
> >TimeSeries", e);
> >+            return null;
> >+        }
> >+    }
> >+
> >+    static CompositeType getCompositeType(String name) throws
> >OpenDataException {
> >+        ArrayType<int[]> longArrayType = new
> >ArrayType<int[]>(SimpleType.LONG, true);
> >+        OpenType<?>[] itemTypes = new OpenType[] {longArrayType,
> >longArrayType, longArrayType, longArrayType};
> >+        return new CompositeType(name, name + " time series",
> >ITEM_NAMES, ITEM_NAMES, itemTypes);
> >+    }
> >+}
> >
> >Propchange:
> >jackrabbit/oak/trunk/oak-core/src/main/java/org/apache/jackrabbit/oak/stat
> >s/TimeSeriesStatsUtil.java
> >--------------------------------------------------------------------------
> >----
> >    svn:eol-style = native
> >
> >
>
>