You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by rm...@apache.org on 2014/02/11 07:37:32 UTC
svn commit: r1566992 - in /incubator/sirona/trunk:
core/src/main/java/org/apache/sirona/counters/ server/boomerang/
server/boomerang/src/main/java/org/apache/sirona/boomerang/
server/boomerang/src/test/java/org/
server/boomerang/src/test/java/org/apach...
Author: rmannibucau
Date: Tue Feb 11 06:37:32 2014
New Revision: 1566992
URL: http://svn.apache.org/r1566992
Log:
SIRONA-11 cleaning up boomerang collector mode
Added:
incubator/sirona/trunk/server/boomerang/src/test/java/org/
incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/
incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/
incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/
incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java
Modified:
incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java
incubator/sirona/trunk/server/boomerang/pom.xml
incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java
Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java?rev=1566992&r1=1566991&r2=1566992&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java Tue Feb 11 06:37:32 2014
@@ -34,10 +34,13 @@ public class DefaultCounter implements C
private ObjectName jmx = null;
public DefaultCounter(final Key key, final CounterDataStore store) {
+ this(key, store, new OptimizedStatistics());
+ }
+ public DefaultCounter(final Key key, final CounterDataStore store, final OptimizedStatistics statistics) {
this.key = key;
this.dataStore = store;
- this.statistics = new OptimizedStatistics();
+ this.statistics = statistics;
}
public void addInternal(final double delta) { // should be called from a thread safe environment
Modified: incubator/sirona/trunk/server/boomerang/pom.xml
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/boomerang/pom.xml?rev=1566992&r1=1566991&r2=1566992&view=diff
==============================================================================
--- incubator/sirona/trunk/server/boomerang/pom.xml (original)
+++ incubator/sirona/trunk/server/boomerang/pom.xml Tue Feb 11 06:37:32 2014
@@ -36,5 +36,10 @@
<groupId>org.apache.sirona</groupId>
<artifactId>sirona-core</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
</dependencies>
</project>
Modified: incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java?rev=1566992&r1=1566991&r2=1566992&view=diff
==============================================================================
--- incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java (original)
+++ incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java Tue Feb 11 06:37:32 2014
@@ -21,12 +21,16 @@ import org.apache.sirona.SironaException
import org.apache.sirona.boomerang.parser.BoomerangData;
import org.apache.sirona.configuration.ioc.IoCs;
import org.apache.sirona.counters.Counter;
+import org.apache.sirona.counters.DefaultCounter;
import org.apache.sirona.counters.OptimizedStatistics;
import org.apache.sirona.counters.Unit;
import org.apache.sirona.math.M2AwareStatisticalSummary;
import org.apache.sirona.repositories.Repository;
+import org.apache.sirona.store.BatchFuture;
import org.apache.sirona.store.counter.CollectorCounterStore;
import org.apache.sirona.store.counter.CounterDataStore;
+import org.apache.sirona.store.counter.InMemoryCounterDataStore;
+import org.apache.sirona.util.DaemonThreadFactory;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
@@ -41,7 +45,12 @@ import java.net.URLDecoder;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import static java.lang.Math.abs;
import static org.apache.sirona.boomerang.parser.QueryParser.parse;
// all data are in query, this is parsed then added to relative counter
@@ -50,17 +59,18 @@ import static org.apache.sirona.boomeran
public class BoomerangServlet extends HttpServlet {
private static final String UTF_8 = "UTF-8";
- private static final Unit BYTE_PER_SEC = new Unit("b/s");
- private static final Role BOOMERANG_PERCEIVED = new Role("boomerang_perceived", Unit.Time.MILLISECOND);
- private static final Role BOOMERANG_LATENCY = new Role("boomerang_latency", Unit.Time.MILLISECOND);
- private static final Role BOOMERANG_BANDWITH = new Role("boomerang_bandwidth", BYTE_PER_SEC);
+ public static final Unit BYTE_PER_SEC = new Unit("b/s");
+ public static final Role BOOMERANG_PERCEIVED = new Role("boomerang_perceived", Unit.Time.MILLISECOND);
+ public static final Role BOOMERANG_LATENCY = new Role("boomerang_latency", Unit.Time.MILLISECOND);
+ public static final Role BOOMERANG_BANDWITH = new Role("boomerang_bandwidth", BYTE_PER_SEC);
// marker doesn't make much sense for boomerang
private static final String BOOMERANG_MARKER = "boomerang";
+ private static final String DEFAULT_DELAY = "4000";
private String encoding = UTF_8;
- private CollectorCounterStore collectorCounterStore = null;
private CounterDataStore counterStore = null;
+ private BatchFuture future = null;
@Override
public void init(final ServletConfig config) throws ServletException {
@@ -72,9 +82,30 @@ public class BoomerangServlet extends Ht
// force init to ensure we have stores
IoCs.findOrCreateInstance(Repository.class);
try {
- collectorCounterStore = IoCs.findOrCreateInstance(CollectorCounterStore.class);
+ final CollectorCounterStore collectorCounterStore = IoCs.getInstance(CollectorCounterStore.class);
+ if (collectorCounterStore == null) {
+ counterStore = IoCs.getInstance(CounterDataStore.class);
+ } else {
+ counterStore = new InitializedCounterDataStore(collectorCounterStore);
+
+ String delayStr = config.getInitParameter("rate");
+ if (delayStr == null) {
+ delayStr = DEFAULT_DELAY;
+ }
+ final ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("boomerang-updater-" + abs(hashCode())));
+ final long delay = Long.parseLong(delayStr);
+ final ScheduledFuture<?> task = pool.scheduleWithFixedDelay(new CollectorUpdater(collectorCounterStore, counterStore), delay, delay, TimeUnit.MILLISECONDS);
+ future = new BatchFuture(pool, task);
+ }
} catch (final SironaException se) {
- counterStore = IoCs.findOrCreateInstance(CounterDataStore.class);
+ counterStore = IoCs.getInstance(CounterDataStore.class);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ if (future != null) {
+ future.done();
}
}
@@ -102,25 +133,8 @@ public class BoomerangServlet extends Ht
private void addToCounter(final Role role, final String name, final long value) throws MalformedURLException {
final Counter.Key key = new Counter.Key(role, name);
- if (collectorCounterStore != null) { // TODO: better locking? or better window aggregation with bg thread
- final Counter counter = collectorCounterStore.getOrCreateCounter(key, BOOMERANG_MARKER);
- synchronized (counter) { // counter is cached so we can sync it
- final OptimizedStatistics computationalStats = new OptimizedStatistics(
- counter.getHits(), counter.getSum(), counter.getMin(), counter.getMax(),
- counter.getMean(), counter.getSecondMoment()
- ).addValue(value);
-
- final M2AwareStatisticalSummary stats = new M2AwareStatisticalSummary(
- computationalStats.getMean(), computationalStats.getVariance(), computationalStats.getN(),
- computationalStats.getMax(), computationalStats.getMin(), computationalStats.getSum(),
- computationalStats.getSecondMoment()
- );
- collectorCounterStore.update(key, BOOMERANG_MARKER, stats, -1);
- }
- } else {
- final Counter counter = counterStore.getOrCreateCounter(key);
- counterStore.addToCounter(counter, key.getRole().getUnit().convert(value, Unit.Time.MILLISECOND));
- }
+ final Counter counter = counterStore.getOrCreateCounter(key);
+ counterStore.addToCounter(counter, key.getRole().getUnit().convert(value, Unit.Time.MILLISECOND));
}
private Map<String, String> toMap(final String query) {
@@ -146,4 +160,48 @@ public class BoomerangServlet extends Ht
}
return params;
}
+
+ private static class CollectorUpdater implements Runnable {
+ private final CollectorCounterStore collectorCounterStore;
+ private final CounterDataStore aggregator;
+
+ private CollectorUpdater(final CollectorCounterStore collectorCounterStore, final CounterDataStore aggregator) {
+ this.collectorCounterStore = collectorCounterStore;
+ this.aggregator = aggregator;
+ }
+
+ @Override
+ public void run() {
+ for (final Counter aggregate : aggregator.getCounters()) {
+ final M2AwareStatisticalSummary stats = new M2AwareStatisticalSummary(
+ aggregate.getMean(), aggregate.getVariance(), aggregate.getHits(),
+ aggregate.getMax(), aggregate.getMin(), aggregate.getSum(),
+ aggregate.getSecondMoment()
+ );
+ collectorCounterStore.update(aggregate.getKey(), BOOMERANG_MARKER, stats, aggregate.getMaxConcurrency());
+ }
+ }
+ }
+
+ private static class InitializedCounterDataStore extends InMemoryCounterDataStore {
+ private final CollectorCounterStore delegate;
+
+ public InitializedCounterDataStore(final CollectorCounterStore collectorCounterStore) {
+ this.delegate = collectorCounterStore;
+ }
+
+ @Override
+ protected Counter newCounter(final Counter.Key key) {
+ return new InitializedDefaultCounter(key, this, delegate.getOrCreateCounter(key, BOOMERANG_MARKER));
+ }
+ }
+
+ private static class InitializedDefaultCounter extends DefaultCounter {
+ public InitializedDefaultCounter(final Key key, final InitializedCounterDataStore store, final Counter aggregate) {
+ super(key, store, new OptimizedStatistics(
+ aggregate.getHits(), aggregate.getSum(), aggregate.getMin(),
+ aggregate.getMax(), aggregate.getMean(), aggregate.getSecondMoment()
+ ));
+ }
+ }
}
Added: incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java?rev=1566992&view=auto
==============================================================================
--- incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java (added)
+++ incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java Tue Feb 11 06:37:32 2014
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sirona.boomerang;
+
+import org.apache.sirona.counters.Counter;
+import org.apache.sirona.repositories.Repository;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class BoomerangServletTest {
+ @Before
+ @After
+ public void init() {
+ Repository.INSTANCE.reset();
+ }
+
+ @Test
+ public void collect() throws ServletException, IOException {
+ final BoomerangServlet servlet = new BoomerangServlet();
+ final ClassLoader loader = Thread.currentThread().getContextClassLoader();
+
+ servlet.init(ServletConfig.class.cast(
+ Proxy.newProxyInstance(loader, new Class<?>[]{ServletConfig.class}, new InvocationHandler() {
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+ return null;
+ }
+ })
+ ));
+ try {
+ servlet.service(
+ HttpServletRequest.class.cast(
+ Proxy.newProxyInstance(loader, new Class<?>[]{HttpServletRequest.class}, new InvocationHandler() {
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+ if ("getQueryString".equals(method.getName())) {
+ return "v=0.9&u=http%3A%2F%2Flocalhost%2Fboomerang%2Ftest.html&rt.start=navigation" +
+ "&rt.bstart=1391971905166&rt.end=1391971905170&t_done=146&t_resp=6" +
+ "&t_page=140&r=&t_other=boomerang%7C2%2Cboomr_fb%7C142&bw=NaN&bw_err=NaN" +
+ "&lat=19&lat_err=2.85&bw_time=1391971906";
+ }
+ return null;
+ }
+ })),
+ HttpServletResponse.class.cast(
+ Proxy.newProxyInstance(loader, new Class<?>[]{HttpServletResponse.class}, new InvocationHandler() {
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+ if ("getWriter".equals(method.getName())) {
+ return new PrintWriter(new ByteArrayOutputStream()); // don't care
+ }
+ return null;
+ }
+ })));
+ } finally {
+ servlet.destroy();
+ }
+
+ final Counter perceived = Repository.INSTANCE.getCounter(new Counter.Key(BoomerangServlet.BOOMERANG_PERCEIVED, "/boomerang/test.html"));
+ assertNotNull(perceived);
+ assertEquals(1, perceived.getHits());
+ assertEquals(146., perceived.getMax(), 0.);
+
+ final Counter latency = Repository.INSTANCE.getCounter(new Counter.Key(BoomerangServlet.BOOMERANG_LATENCY, "/boomerang/test.html"));
+ assertNotNull(latency);
+ assertEquals(1, latency.getHits());
+ assertEquals(19., latency.getMax(), 0.);
+ }
+}