You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by rm...@apache.org on 2016/04/27 18:13:58 UTC
svn commit: r1741282 - in /incubator/sirona/trunk: ./
api/src/main/java/org/apache/sirona/counters/ store/memory/
store/memory/src/main/java/org/apache/sirona/store/memory/counter/
store/memory/src/test/ store/memory/src/test/java/ store/memory/src/tes...
Author: rmannibucau
Date: Wed Apr 27 16:13:58 2016
New Revision: 1741282
URL: http://svn.apache.org/viewvc?rev=1741282&view=rev
Log:
SIRONA-63 starting to impl exponential decay counter
Added:
incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/LockableCounter.java
incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounter.java
incubator/sirona/trunk/store/memory/src/test/
incubator/sirona/trunk/store/memory/src/test/java/
incubator/sirona/trunk/store/memory/src/test/java/org/
incubator/sirona/trunk/store/memory/src/test/java/org/apache/
incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/
incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/store/
incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/store/memory/
incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/store/memory/counter/
incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounterTest.java
Modified:
incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/DefaultCounter.java
incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/OptimizedStatistics.java
incubator/sirona/trunk/pom.xml
incubator/sirona/trunk/store/memory/pom.xml
incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/InMemoryCounterDataStore.java
incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/LimitedInMemoryCounterDataStore.java
Modified: incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/DefaultCounter.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/DefaultCounter.java?rev=1741282&r1=1741281&r2=1741282&view=diff
==============================================================================
--- incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/DefaultCounter.java (original)
+++ incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/DefaultCounter.java Wed Apr 27 16:13:58 2016
@@ -18,76 +18,38 @@ package org.apache.sirona.counters;
import org.apache.sirona.store.counter.CounterDataStore;
-import javax.management.ObjectName;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class DefaultCounter implements Counter {
- private final AtomicInteger concurrency = new AtomicInteger(0);
- private final Key key;
- private final CounterDataStore dataStore;
- private volatile int maxConcurrency = 0;
+public class DefaultCounter extends LockableCounter {
protected final OptimizedStatistics statistics;
- protected final ReadWriteLock lock = new ReentrantReadWriteLock();
- private ObjectName jmx = null;
public DefaultCounter(final Key key, final CounterDataStore store) {
this(key, store, new OptimizedStatistics());
}
public DefaultCounter(final Key key, final CounterDataStore store, final OptimizedStatistics statistics) {
- this.key = key;
- this.dataStore = store;
-
+ super(key, store);
this.statistics = statistics;
}
- public void addInternal(final double delta) { // should be called from a thread safe environment
- statistics.addValue(delta);
- }
-
- @Override
- public void updateConcurrency(final int concurrency) {
- if (concurrency > maxConcurrency) {
- maxConcurrency = concurrency;
+ public void addInternal(final double delta) {
+ final Lock lock = getLock().writeLock();
+ lock.lock();
+ try {
+ statistics.addValue(delta);
+ } finally {
+ lock.unlock();
}
}
@Override
- public int getMaxConcurrency() {
- return maxConcurrency;
- }
-
- @Override
- public AtomicInteger currentConcurrency() {
- return concurrency;
- }
-
- @Override
- public Key getKey() {
- return key;
- }
-
- @Override
public void reset() {
statistics.clear();
- maxConcurrency = 0;
- }
-
- @Override
- public void add(final double delta) {
- dataStore.addToCounter(this, delta);
- }
-
- @Override
- public void add(final double delta, final Unit deltaUnit) {
- add(key.getRole().getUnit().convert(delta, deltaUnit));
+ super.reset();
}
@Override
public double getMax() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.getMax();
@@ -98,7 +60,7 @@ public class DefaultCounter implements C
@Override
public double getMin() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.getMin();
@@ -109,7 +71,7 @@ public class DefaultCounter implements C
@Override
public double getSum() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.getSum();
@@ -120,7 +82,7 @@ public class DefaultCounter implements C
@Override
public double getStandardDeviation() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.getStandardDeviation();
@@ -131,7 +93,7 @@ public class DefaultCounter implements C
@Override
public double getVariance() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.getVariance();
@@ -142,7 +104,7 @@ public class DefaultCounter implements C
@Override
public double getMean() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.getMean();
@@ -153,7 +115,7 @@ public class DefaultCounter implements C
@Override
public double getSecondMoment() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.getSecondMoment();
@@ -164,7 +126,7 @@ public class DefaultCounter implements C
@Override
public long getHits() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.getN();
@@ -173,8 +135,9 @@ public class DefaultCounter implements C
}
}
+ @Override
public OptimizedStatistics getStatistics() {
- final Lock rl = lock.readLock();
+ final Lock rl = getLock().readLock();
rl.lock();
try {
return statistics.copy();
@@ -183,44 +146,13 @@ public class DefaultCounter implements C
}
}
- public ReadWriteLock getLock() {
- return lock;
- }
-
- public void setJmx(final ObjectName jmx) {
- this.jmx = jmx;
- }
-
- public ObjectName getJmx() {
- return jmx;
- }
-
@Override
public String toString() {
return "DefaultCounter{" +
- "concurrency=" + concurrency +
- ", key=" + key +
- ", dataStore=" + dataStore +
- ", maxConcurrency=" + maxConcurrency +
+ "concurrency=" + currentConcurrency().get() +
+ ", key=" + getKey() +
+ ", maxConcurrency=" + getMaxConcurrency() +
", statistics=" + statistics +
'}';
}
-
- @Override
- public boolean equals(final Object o) {
- if (this == o) {
- return true;
- }
- if (!Counter.class.isInstance(o)) {
- return false;
- }
-
- final Counter that = Counter.class.cast(o);
- return key.equals(that.getKey());
- }
-
- @Override
- public int hashCode() {
- return key.hashCode();
- }
}
Added: incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/LockableCounter.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/LockableCounter.java?rev=1741282&view=auto
==============================================================================
--- incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/LockableCounter.java (added)
+++ incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/LockableCounter.java Wed Apr 27 16:13:58 2016
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sirona.counters;
+
+import org.apache.sirona.store.counter.CounterDataStore;
+
+import javax.management.ObjectName;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public abstract class LockableCounter implements Counter {
+ private final Key key;
+ private final CounterDataStore dataStore;
+ protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private final AtomicInteger concurrency = new AtomicInteger(0);
+ private volatile int maxConcurrency = 0;
+ private ObjectName jmx = null;
+
+ protected LockableCounter(final Key key, final CounterDataStore dataStore) {
+ this.key = key;
+ this.dataStore = dataStore;
+ }
+
+ public abstract void addInternal(double delta);
+ public abstract OptimizedStatistics getStatistics();
+
+ @Override
+ public void add(final double delta) {
+ dataStore.addToCounter(this, delta);
+ }
+
+ @Override
+ public void add(final double delta, final Unit deltaUnit) {
+ add(getKey().getRole().getUnit().convert(delta, deltaUnit));
+ }
+
+ @Override
+ public Key getKey() {
+ return key;
+ }
+
+ @Override
+ public void updateConcurrency(final int concurrency) {
+ if (concurrency > maxConcurrency) {
+ maxConcurrency = concurrency;
+ }
+ }
+
+ @Override
+ public int getMaxConcurrency() {
+ return maxConcurrency;
+ }
+
+ @Override
+ public AtomicInteger currentConcurrency() {
+ return concurrency;
+ }
+
+ public void setJmx(final ObjectName jmx) {
+ this.jmx = jmx;
+ }
+
+ public ObjectName getJmx() {
+ return jmx;
+ }
+
+ public ReadWriteLock getLock() {
+ return lock;
+ }
+
+ public void reset() {
+ maxConcurrency = 0;
+ }
+
+ @Override
+ public double getMax() {
+ return getStatistics().getMax();
+ }
+
+ @Override
+ public double getMin() {
+ return getStatistics().getMin();
+ }
+
+ @Override
+ public long getHits() {
+ return getStatistics().getN();
+ }
+
+ @Override
+ public double getSum() {
+ return getStatistics().getSum();
+ }
+
+ @Override
+ public double getStandardDeviation() {
+ return getStatistics().getStandardDeviation();
+ }
+
+ @Override
+ public double getVariance() {
+ return getStatistics().getVariance();
+ }
+
+ @Override
+ public double getMean() {
+ return getStatistics().getMean();
+ }
+
+ @Override
+ public double getSecondMoment() {
+ return getStatistics().getSecondMoment();
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!Counter.class.isInstance(o)) {
+ return false;
+ }
+
+ final Counter that = Counter.class.cast(o);
+ return getKey().equals(that.getKey());
+ }
+
+ @Override
+ public int hashCode() {
+ return getKey().hashCode();
+ }
+}
Modified: incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/OptimizedStatistics.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/OptimizedStatistics.java?rev=1741282&r1=1741281&r2=1741282&view=diff
==============================================================================
--- incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/OptimizedStatistics.java (original)
+++ incubator/sirona/trunk/api/src/main/java/org/apache/sirona/counters/OptimizedStatistics.java Wed Apr 27 16:13:58 2016
@@ -17,10 +17,10 @@
package org.apache.sirona.counters;
public class OptimizedStatistics {
- private long n = 0;
- private double sum = 0;
- private double min = Double.NaN;
- private double max = Double.NaN;
+ protected long n = 0;
+ protected double sum = 0;
+ protected double min = Double.NaN;
+ protected double max = Double.NaN;
// first moment (mean)
protected double m1 = Double.NaN;
Modified: incubator/sirona/trunk/pom.xml
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/pom.xml?rev=1741282&r1=1741281&r2=1741282&view=diff
==============================================================================
--- incubator/sirona/trunk/pom.xml (original)
+++ incubator/sirona/trunk/pom.xml Wed Apr 27 16:13:58 2016
@@ -417,7 +417,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
+ <version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
Modified: incubator/sirona/trunk/store/memory/pom.xml
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/store/memory/pom.xml?rev=1741282&r1=1741281&r2=1741282&view=diff
==============================================================================
--- incubator/sirona/trunk/store/memory/pom.xml (original)
+++ incubator/sirona/trunk/store/memory/pom.xml Wed Apr 27 16:13:58 2016
@@ -33,6 +33,10 @@
<groupId>org.apache.sirona</groupId>
<artifactId>sirona-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
</dependencies>
</project>
Added: incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounter.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounter.java?rev=1741282&view=auto
==============================================================================
--- incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounter.java (added)
+++ incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounter.java Wed Apr 27 16:13:58 2016
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sirona.store.memory.counter;
+
+import org.apache.sirona.counters.LockableCounter;
+import org.apache.sirona.counters.OptimizedStatistics;
+import org.apache.sirona.store.counter.CounterDataStore;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+
+// exponential decay sampling implementation
+// inspired from http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf
+//
+// @Experimental, likely needs some more love for m2 computation
+public class ExponentialDecayCounter extends LockableCounter {
+ public static final double ACCEPTABLE_DEFAULT_ALPHA = 0.015;
+ public static final int ACCEPTABLE_DEFAULT_SIZE = 512;
+ public static final long ACCEPTABLE_STATISTICS_REFRESH_SECONDS = 5;
+
+ private final double alpha;
+ private final double samplingSize;
+ private final long refreshStatInterval;
+
+ private final AtomicInteger currentCount = new AtomicInteger();
+ private volatile long recomputeAt;
+ private volatile long becameAt;
+ private volatile long computedStatsAt;
+ private final ConcurrentSkipListMap<Double, Double> values = new ConcurrentSkipListMap<Double, Double>();
+ private volatile OptimizedStatistics currentStats;
+
+ public ExponentialDecayCounter(final Key key, final CounterDataStore store,
+ final double alpha, final int samplingSize, final long refreshStatInterval) {
+ super(key, store);
+ this.alpha = alpha;
+ this.samplingSize = samplingSize;
+ this.refreshStatInterval = refreshStatInterval;
+ reset();
+ }
+
+ protected long seconds() {
+ return TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
+ }
+
+ @Override
+ public void addInternal(final double delta) {
+ final long now = seconds();
+
+ final Lock rl = getLock().readLock();
+ rl.lock();
+ try {
+ final double priority = Math.exp(alpha * (now - becameAt)) / Math.random();
+ if (currentCount.incrementAndGet() > samplingSize) { // we need to remove 1 sample to respect samplingSize
+ double head = values.firstKey();
+ if (priority > head && values.putIfAbsent(priority, delta) == null) {
+ while (values.remove(values.firstKey()) == null) {
+ // no-op
+ }
+ }
+ } else {
+ values.put(priority, delta);
+ }
+ } finally {
+ rl.unlock();
+ }
+
+ if (now >= recomputeAt) {
+ if (recomputeAt == now) {
+ return;
+ }
+ final Lock lock = getLock().writeLock();
+ lock.lock();
+ try {
+ final long currentLoopBecame = becameAt;
+ becameAt = now;
+ recomputeAt = nextComputation();
+ final long timeDiff = becameAt - currentLoopBecame;
+
+ for (final Double priority : new ArrayList<Double>(values.keySet())) {
+ values.put(priority * Math.exp(-alpha * timeDiff), values.remove(priority));
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ @Override
+ public void reset() {
+ final long now = seconds();
+ final Lock lock = getLock().writeLock();
+ lock.lock();
+ try {
+ values.clear();
+ this.currentCount.set(0);
+ this.becameAt = now;
+ this.recomputeAt = nextComputation();
+ super.reset();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private long nextComputation() {
+ return this.becameAt + TimeUnit.MILLISECONDS.toSeconds(TimeUnit.HOURS.toMillis(1));
+ }
+
+ @Override
+ public OptimizedStatistics getStatistics() {
+ if (computedStatsAt != 0 && System.currentTimeMillis() - computedStatsAt < refreshStatInterval) {
+ return currentStats;
+ }
+
+ final LazyOptimizedStatistics stat;
+ final Lock lock = getLock().readLock();
+ lock.lock();
+ try {
+ stat = new LazyOptimizedStatistics(new HashMap<Double, Double>(values));
+ computedStatsAt = seconds();
+ } finally {
+ lock.unlock();
+ }
+
+ stat.init(); // out of lock
+
+ return (currentStats = stat);
+ }
+
+ @Override
+ public String toString() {
+ return "ExponentialDecayCounter{" +
+ "key=" + getKey() +
+ ", stats=" + getStatistics() +
+ '}';
+ }
+
+ private static class LazyOptimizedStatistics extends OptimizedStatistics {
+ private final Map<Double, Double> values;
+
+ private LazyOptimizedStatistics(final Map<Double, Double> values) {
+ this.values = values;
+ }
+
+ @Override
+ public OptimizedStatistics addValue(final double value) {
+ throw new UnsupportedOperationException();
+ }
+
+ private void init() {
+ if (values.isEmpty()) {
+ return;
+ }
+
+ // computed stats init
+ n = values.size();
+ min = Double.NaN;
+ max = Double.NaN;
+ m1 = 0;
+ m2 = 0;
+ sum = 0;
+
+ // sum priorities
+ double prioritySum = 0;
+ for (final Double priority : values.keySet()) {
+ prioritySum += priority;
+ }
+
+ { // normalise priorities
+ for (final Double priority : new ArrayList<Double>(values.keySet())) {
+ values.put(priority / prioritySum, values.remove(priority));
+ }
+ }
+ { // min/max
+ for (final Double val : new ArrayList<Double>(values.values())) {
+ if (Double.isNaN(min)) { // init
+ max = min = val;
+ } else {
+ if (min > val) {
+ min = val;
+ }
+ if (max < val) {
+ max = val;
+ }
+ }
+ }
+ }
+ { // mean
+ for (final Map.Entry<Double, Double> priority : values.entrySet()) {
+ m1 += priority.getValue() * priority.getKey();
+ }
+ }
+ if (values.size() > 1) { // variance
+ for (final Map.Entry<Double, Double> priority : values.entrySet()) {
+ m2 += Math.pow(priority.getValue() - m1, 2) * priority.getKey();
+ }
+ m2 *= (n - 1);
+ }
+ { // sum (doesn't represent much - mathematically wrong - but gives an idea)
+ for (final Double v : values.values()) {
+ sum += v;
+ }
+ }
+
+ values.clear();
+ }
+ }
+}
Modified: incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/InMemoryCounterDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/InMemoryCounterDataStore.java?rev=1741282&r1=1741281&r2=1741282&view=diff
==============================================================================
--- incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/InMemoryCounterDataStore.java (original)
+++ incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/InMemoryCounterDataStore.java Wed Apr 27 16:13:58 2016
@@ -20,6 +20,7 @@ import org.apache.sirona.configuration.C
import org.apache.sirona.configuration.ioc.Destroying;
import org.apache.sirona.counters.Counter;
import org.apache.sirona.counters.DefaultCounter;
+import org.apache.sirona.counters.LockableCounter;
import org.apache.sirona.counters.MetricData;
import org.apache.sirona.counters.OptimizedStatistics;
import org.apache.sirona.counters.jmx.CounterJMX;
@@ -39,11 +40,20 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-public class InMemoryCounterDataStore implements CounterDataStore
-{
+public class InMemoryCounterDataStore implements CounterDataStore {
protected final boolean gauged = Configuration.is(Configuration.CONFIG_PROPERTY_PREFIX + "counter.with-gauge", false);
protected final boolean jmx = Configuration.is(Configuration.CONFIG_PROPERTY_PREFIX + "counter.with-jmx", false);
+ protected final boolean useExponentialDecay = Configuration.is(Configuration.CONFIG_PROPERTY_PREFIX + "counter.exponential-decay", false);
+ protected final double exponentialDecayAlpha = Double.parseDouble(
+ Configuration.getProperty(Configuration.CONFIG_PROPERTY_PREFIX + "counter.exponential-decay.alpha",
+ Double.toString(ExponentialDecayCounter.ACCEPTABLE_DEFAULT_ALPHA)));
+ protected final int exponentialDecaySamplingSize =
+ Configuration.getInteger(Configuration.CONFIG_PROPERTY_PREFIX + "counter.exponential-decay.sampling-size", ExponentialDecayCounter.ACCEPTABLE_DEFAULT_SIZE);
+ protected final long exponentialDecayRefresh =
+ (long) Configuration.getInteger(Configuration.CONFIG_PROPERTY_PREFIX + "counter.exponential-decay.refresh-seconds",
+ (int) ExponentialDecayCounter.ACCEPTABLE_STATISTICS_REFRESH_SECONDS);
+
protected final ConcurrentMap<Counter.Key, Counter> counters = newCounterMap();
protected final ConcurrentMap<Counter.Key, Collection<Gauge>> gauges = new ConcurrentHashMap<Counter.Key, Collection<Gauge>>();
protected final ReadWriteLock stateLock = new ReentrantReadWriteLock(); // this lock ensures consistency between createcounter and clearcounters
@@ -53,7 +63,9 @@ public class InMemoryCounterDataStore im
}
protected Counter newCounter(final Counter.Key key) {
- return new DefaultCounter(key, this);
+ return useExponentialDecay ?
+ new ExponentialDecayCounter(key, this, exponentialDecayAlpha, exponentialDecaySamplingSize, exponentialDecayRefresh) :
+ new DefaultCounter(key, this);
}
@Override
@@ -86,10 +98,10 @@ public class InMemoryCounterDataStore im
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
try {
final ObjectName objectName = new ObjectName(
- Configuration.CONFIG_PROPERTY_PREFIX
- + "counter:role=" + escapeJmx(key.getRole().getName())
- + ",name=" + escapeJmx(key.getName()));
- DefaultCounter.class.cast(counter).setJmx(objectName);
+ Configuration.CONFIG_PROPERTY_PREFIX
+ + "counter:role=" + escapeJmx(key.getRole().getName())
+ + ",name=" + escapeJmx(key.getName()));
+ LockableCounter.class.cast(counter).setJmx(objectName);
if (!server.isRegistered(objectName)) {
server.registerMBean(new CounterJMX(counter), objectName);
@@ -124,7 +136,7 @@ public class InMemoryCounterDataStore im
final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
for (final Counter counter : counters.values()) {
try {
- server.unregisterMBean(DefaultCounter.class.cast(counter).getJmx());
+ server.unregisterMBean(LockableCounter.class.cast(counter).getJmx());
} catch (final Exception e) {
// no-op
}
@@ -153,18 +165,10 @@ public class InMemoryCounterDataStore im
@Override
public void addToCounter(final Counter counter, final double delta) {
- if (!DefaultCounter.class.isInstance(counter)) {
- throw new IllegalArgumentException(getClass().getName() + " only supports " + DefaultCounter.class.getName());
- }
-
- final DefaultCounter defaultCounter = DefaultCounter.class.cast(counter);
- final Lock lock = defaultCounter.getLock().writeLock();
- lock.lock();
- try {
- defaultCounter.addInternal(delta);
- } finally {
- lock.unlock();
+ if (!LockableCounter.class.isInstance(counter)) {
+ throw new IllegalArgumentException(getClass().getName() + " only supports " + LockableCounter.class.getName());
}
+ LockableCounter.class.cast(counter).addInternal(delta);
}
private static class SyncCounterGauge extends CounterGauge {
@@ -204,7 +208,7 @@ public class InMemoryCounterDataStore im
public synchronized void take() {
if (called == 3 || called == -1) {
- final DefaultCounter defaultCounter = DefaultCounter.class.cast(counter);
+ final LockableCounter defaultCounter = LockableCounter.class.cast(counter);
final Lock lock = defaultCounter.getLock().writeLock();
lock.lock();
try {
Modified: incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/LimitedInMemoryCounterDataStore.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/LimitedInMemoryCounterDataStore.java?rev=1741282&r1=1741281&r2=1741282&view=diff
==============================================================================
--- incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/LimitedInMemoryCounterDataStore.java (original)
+++ incubator/sirona/trunk/store/memory/src/main/java/org/apache/sirona/store/memory/counter/LimitedInMemoryCounterDataStore.java Wed Apr 27 16:13:58 2016
@@ -19,10 +19,11 @@ package org.apache.sirona.store.memory.c
import org.apache.sirona.Role;
import org.apache.sirona.configuration.Configuration;
import org.apache.sirona.counters.Counter;
-import org.apache.sirona.counters.DefaultCounter;
+import org.apache.sirona.counters.LockableCounter;
+import org.apache.sirona.counters.OptimizedStatistics;
+import org.apache.sirona.counters.Unit;
import org.apache.sirona.gauges.Gauge;
import org.apache.sirona.repositories.Repository;
-import org.apache.sirona.store.counter.CounterDataStore;
import javax.management.MBeanServer;
import javax.management.ObjectName;
@@ -34,6 +35,8 @@ import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
// ensure we don't explode memory cause of web counters, this class will be integrated in sirona > 0.2
public class LimitedInMemoryCounterDataStore extends InMemoryCounterDataStore
@@ -52,11 +55,11 @@ public class LimitedInMemoryCounterDataS
protected Counter newCounter(final Counter.Key key) {
if (ONLY_EVICT_WEB_COUNTERS) {
if (Role.WEB.equals(key.getRole())) {
- return new DefaultCounterTimestamped(key, this);
+ return new DefaultCounterTimestamped(LockableCounter.class.cast(super.newCounter(key)));
}
return super.newCounter(key);
}
- return new DefaultCounterTimestamped(key, this);
+ return new DefaultCounterTimestamped(LockableCounter.class.cast(super.newCounter(key)));
}
protected class FixedSizedMap extends ConcurrentSkipListMap<Counter.Key, Counter> {
@@ -142,7 +145,7 @@ public class LimitedInMemoryCounterDataS
}
if (jmx) {
try {
- final ObjectName objectName = DefaultCounter.class.cast(entry.getValue()).getJmx();
+ final ObjectName objectName = LockableCounter.class.cast(entry.getValue()).getJmx();
if (server.isRegistered(objectName)) {
server.unregisterMBean(objectName);
}
@@ -159,17 +162,114 @@ public class LimitedInMemoryCounterDataS
}
}
- private static class DefaultCounterTimestamped extends DefaultCounter {
+ private static class DefaultCounterTimestamped extends LockableCounter {
+ private final LockableCounter delegate;
private volatile long timestamp = System.currentTimeMillis();
- public DefaultCounterTimestamped(final Key key, final CounterDataStore store) {
- super(key, store);
+ public DefaultCounterTimestamped(final LockableCounter delegate) {
+ super(null, null);
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void addInternal(final double delta) {
+ this.delegate.add(delta);
+ }
+
+ @Override
+ public OptimizedStatistics getStatistics() {
+ return this.delegate.getStatistics();
+ }
+
+ @Override
+ public void setJmx(ObjectName jmx) {
+ this.delegate.setJmx(jmx);
+ }
+
+ @Override
+ public ObjectName getJmx() {
+ return this.delegate.getJmx();
+ }
+
+ @Override
+ public ReadWriteLock getLock() {
+ return this.delegate.getLock();
+ }
+
+ @Override
+ public void reset() {
+ this.delegate.reset();
}
@Override
public void add(final double delta) {
- super.add(delta);
+ delegate.add(delta);
timestamp = System.currentTimeMillis();
}
+
+ @Override
+ public void add(final double delta, final Unit unit) {
+ this.delegate.add(delta, unit);
+ }
+
+ @Override
+ public AtomicInteger currentConcurrency() {
+ return delegate.currentConcurrency();
+ }
+
+ @Override
+ public void updateConcurrency(final int concurrency) {
+ delegate.updateConcurrency(concurrency);
+ }
+
+ @Override
+ public int getMaxConcurrency() {
+ return delegate.getMaxConcurrency();
+ }
+
+ @Override
+ public double getMax() {
+ return delegate.getMax();
+ }
+
+ @Override
+ public double getMin() {
+ return delegate.getMin();
+ }
+
+ @Override
+ public long getHits() {
+ return delegate.getHits();
+ }
+
+ @Override
+ public double getSum() {
+ return delegate.getSum();
+ }
+
+ @Override
+ public double getStandardDeviation() {
+ return delegate.getStandardDeviation();
+ }
+
+ @Override
+ public double getVariance() {
+ return delegate.getVariance();
+ }
+
+ @Override
+ public double getMean() {
+ return delegate.getMean();
+ }
+
+ @Override
+ public double getSecondMoment() {
+ return delegate.getSecondMoment();
+ }
+
+ @Override
+ public Key getKey() {
+ return delegate.getKey();
+ }
}
}
Added: incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounterTest.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounterTest.java?rev=1741282&view=auto
==============================================================================
--- incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounterTest.java (added)
+++ incubator/sirona/trunk/store/memory/src/test/java/org/apache/sirona/store/memory/counter/ExponentialDecayCounterTest.java Wed Apr 27 16:13:58 2016
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sirona.store.memory.counter;
+
+import org.apache.sirona.Role;
+import org.apache.sirona.counters.Counter;
+import org.apache.sirona.counters.LockableCounter;
+import org.apache.sirona.counters.OptimizedStatistics;
+import org.apache.sirona.counters.Unit;
+import org.apache.sirona.store.counter.CounterDataStore;
+import org.junit.Test;
+
+import java.util.Collection;
+
+import static org.junit.Assert.assertEquals;
+
+public class ExponentialDecayCounterTest {
+ @Test
+ public void run() {
+ final ExponentialDecayCounter counter = new ExponentialDecayCounter(new Counter.Key(new Role("doctor", Unit.KILO), "weight"), new CounterDataStore() {
+ public Counter getOrCreateCounter(final Counter.Key key) {
+ throw new UnsupportedOperationException();
+ }
+
+ public void clearCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+ public Collection<Counter> getCounters() {
+ throw new UnsupportedOperationException();
+ }
+
+ public void addToCounter(final Counter defaultCounter, final double delta) {
+ LockableCounter.class.cast(defaultCounter).addInternal(delta);
+ }
+ }, ExponentialDecayCounter.ACCEPTABLE_DEFAULT_ALPHA, 3, 60);
+
+ counter.add(80.0, Unit.KILO);
+ counter.add(75.0, Unit.KILO);
+ counter.add(90.0, Unit.KILO);
+
+ final OptimizedStatistics accurate = new OptimizedStatistics();
+ accurate.addValue(80);
+ accurate.addValue(75);
+ accurate.addValue(90);
+
+ {
+ final OptimizedStatistics statistics = counter.getStatistics();
+ assertEquals(3, statistics.getN());
+
+ // ensure counter and stats reflects the same state
+ assertEquals(counter.getHits(), statistics.getN());
+ assertEquals(counter.getSum(), statistics.getSum(), 0.);
+ assertEquals(counter.getMean(), statistics.getMean(), 0.);
+ assertEquals(counter.getVariance(), statistics.getVariance(), 0.);
+ assertEquals(counter.getMin(), statistics.getMin(), 0.);
+ assertEquals(counter.getMax(), statistics.getMax(), 0.);
+
+ // check values are accurate
+ assertEquals(accurate.getSum(), statistics.getSum(), 0.);
+ assertEquals(accurate.getMin(), statistics.getMin(), 0.);
+ assertEquals(accurate.getMax(), statistics.getMax(), 0.);
+ assertEquals(accurate.getMean(), statistics.getMean(), 10);
+ assertEquals(accurate.getStandardDeviation(), statistics.getStandardDeviation(), 3);
+ // assertEquals(accurate.getVariance(), statistics.getVariance(), 15); // not perfect yet
+ }
+ }
+}
+