You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@sirona.apache.org by rm...@apache.org on 2014/02/11 07:37:32 UTC

svn commit: r1566992 - in /incubator/sirona/trunk: core/src/main/java/org/apache/sirona/counters/ server/boomerang/ server/boomerang/src/main/java/org/apache/sirona/boomerang/ server/boomerang/src/test/java/org/ server/boomerang/src/test/java/org/apach...

Author: rmannibucau
Date: Tue Feb 11 06:37:32 2014
New Revision: 1566992

URL: http://svn.apache.org/r1566992
Log:
SIRONA-11 cleaning up boomerang collector mode

Added:
    incubator/sirona/trunk/server/boomerang/src/test/java/org/
    incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/
    incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/
    incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/
    incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java
Modified:
    incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java
    incubator/sirona/trunk/server/boomerang/pom.xml
    incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java

Modified: incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java?rev=1566992&r1=1566991&r2=1566992&view=diff
==============================================================================
--- incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java (original)
+++ incubator/sirona/trunk/core/src/main/java/org/apache/sirona/counters/DefaultCounter.java Tue Feb 11 06:37:32 2014
@@ -34,10 +34,13 @@ public class DefaultCounter implements C
     private ObjectName jmx = null;
 
     public DefaultCounter(final Key key, final CounterDataStore store) {
+        this(key, store, new OptimizedStatistics());
+    }
+    public DefaultCounter(final Key key, final CounterDataStore store, final OptimizedStatistics statistics) {
         this.key = key;
         this.dataStore = store;
 
-        this.statistics = new OptimizedStatistics();
+        this.statistics = statistics;
     }
 
     public void addInternal(final double delta) { // should be called from a thread safe environment

Modified: incubator/sirona/trunk/server/boomerang/pom.xml
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/boomerang/pom.xml?rev=1566992&r1=1566991&r2=1566992&view=diff
==============================================================================
--- incubator/sirona/trunk/server/boomerang/pom.xml (original)
+++ incubator/sirona/trunk/server/boomerang/pom.xml Tue Feb 11 06:37:32 2014
@@ -36,5 +36,10 @@
       <groupId>org.apache.sirona</groupId>
       <artifactId>sirona-core</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+    </dependency>
   </dependencies>
 </project>

Modified: incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java?rev=1566992&r1=1566991&r2=1566992&view=diff
==============================================================================
--- incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java (original)
+++ incubator/sirona/trunk/server/boomerang/src/main/java/org/apache/sirona/boomerang/BoomerangServlet.java Tue Feb 11 06:37:32 2014
@@ -21,12 +21,16 @@ import org.apache.sirona.SironaException
 import org.apache.sirona.boomerang.parser.BoomerangData;
 import org.apache.sirona.configuration.ioc.IoCs;
 import org.apache.sirona.counters.Counter;
+import org.apache.sirona.counters.DefaultCounter;
 import org.apache.sirona.counters.OptimizedStatistics;
 import org.apache.sirona.counters.Unit;
 import org.apache.sirona.math.M2AwareStatisticalSummary;
 import org.apache.sirona.repositories.Repository;
+import org.apache.sirona.store.BatchFuture;
 import org.apache.sirona.store.counter.CollectorCounterStore;
 import org.apache.sirona.store.counter.CounterDataStore;
+import org.apache.sirona.store.counter.InMemoryCounterDataStore;
+import org.apache.sirona.util.DaemonThreadFactory;
 
 import javax.servlet.ServletConfig;
 import javax.servlet.ServletException;
@@ -41,7 +45,12 @@ import java.net.URLDecoder;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
+import static java.lang.Math.abs;
 import static org.apache.sirona.boomerang.parser.QueryParser.parse;
 
 // all data are in query, this is parsed then added to relative counter
@@ -50,17 +59,18 @@ import static org.apache.sirona.boomeran
 public class BoomerangServlet extends HttpServlet {
     private static final String UTF_8 = "UTF-8";
 
-    private static final Unit BYTE_PER_SEC = new Unit("b/s");
-    private static final Role BOOMERANG_PERCEIVED = new Role("boomerang_perceived", Unit.Time.MILLISECOND);
-    private static final Role BOOMERANG_LATENCY = new Role("boomerang_latency", Unit.Time.MILLISECOND);
-    private static final Role BOOMERANG_BANDWITH = new Role("boomerang_bandwidth", BYTE_PER_SEC);
+    public static final Unit BYTE_PER_SEC = new Unit("b/s");
+    public static final Role BOOMERANG_PERCEIVED = new Role("boomerang_perceived", Unit.Time.MILLISECOND);
+    public static final Role BOOMERANG_LATENCY = new Role("boomerang_latency", Unit.Time.MILLISECOND);
+    public static final Role BOOMERANG_BANDWITH = new Role("boomerang_bandwidth", BYTE_PER_SEC);
 
     // marker doesn't make much sense for boomerang
     private static final String BOOMERANG_MARKER = "boomerang";
+    private static final String DEFAULT_DELAY = "4000";
 
     private String encoding = UTF_8;
-    private CollectorCounterStore collectorCounterStore = null;
     private CounterDataStore counterStore = null;
+    private BatchFuture future = null;
 
     @Override
     public void init(final ServletConfig config) throws ServletException {
@@ -72,9 +82,30 @@ public class BoomerangServlet extends Ht
         // force init to ensure we have stores
         IoCs.findOrCreateInstance(Repository.class);
         try {
-            collectorCounterStore = IoCs.findOrCreateInstance(CollectorCounterStore.class);
+            final CollectorCounterStore collectorCounterStore = IoCs.getInstance(CollectorCounterStore.class);
+            if (collectorCounterStore == null) {
+                counterStore = IoCs.getInstance(CounterDataStore.class);
+            } else {
+                counterStore = new InitializedCounterDataStore(collectorCounterStore);
+
+                String delayStr = config.getInitParameter("rate");
+                if (delayStr == null) {
+                    delayStr = DEFAULT_DELAY;
+                }
+                final ScheduledExecutorService pool = Executors.newSingleThreadScheduledExecutor(new DaemonThreadFactory("boomerang-updater-" + abs(hashCode())));
+                final long delay = Long.parseLong(delayStr);
+                final ScheduledFuture<?> task = pool.scheduleWithFixedDelay(new CollectorUpdater(collectorCounterStore, counterStore), delay, delay, TimeUnit.MILLISECONDS);
+                future = new BatchFuture(pool, task);
+            }
         } catch (final SironaException se) {
-            counterStore = IoCs.findOrCreateInstance(CounterDataStore.class);
+            counterStore = IoCs.getInstance(CounterDataStore.class);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        if (future != null) {
+            future.done();
         }
     }
 
@@ -102,25 +133,8 @@ public class BoomerangServlet extends Ht
 
     private void addToCounter(final Role role, final String name, final long value) throws MalformedURLException {
         final Counter.Key key = new Counter.Key(role, name);
-        if (collectorCounterStore != null) { // TODO: better locking? or better window aggregation with bg thread
-            final Counter counter = collectorCounterStore.getOrCreateCounter(key, BOOMERANG_MARKER);
-            synchronized (counter) { // counter is cached so we can sync it
-                final OptimizedStatistics computationalStats = new OptimizedStatistics(
-                    counter.getHits(), counter.getSum(), counter.getMin(), counter.getMax(),
-                    counter.getMean(), counter.getSecondMoment()
-                ).addValue(value);
-
-                final M2AwareStatisticalSummary stats = new M2AwareStatisticalSummary(
-                    computationalStats.getMean(), computationalStats.getVariance(), computationalStats.getN(),
-                    computationalStats.getMax(), computationalStats.getMin(), computationalStats.getSum(),
-                    computationalStats.getSecondMoment()
-                );
-                collectorCounterStore.update(key, BOOMERANG_MARKER, stats, -1);
-            }
-        } else {
-            final Counter counter = counterStore.getOrCreateCounter(key);
-            counterStore.addToCounter(counter, key.getRole().getUnit().convert(value, Unit.Time.MILLISECOND));
-        }
+        final Counter counter = counterStore.getOrCreateCounter(key);
+        counterStore.addToCounter(counter, key.getRole().getUnit().convert(value, Unit.Time.MILLISECOND));
     }
 
     private Map<String, String> toMap(final String query) {
@@ -146,4 +160,48 @@ public class BoomerangServlet extends Ht
         }
         return params;
     }
+
+    private static class CollectorUpdater implements Runnable {
+        private final CollectorCounterStore collectorCounterStore;
+        private final CounterDataStore aggregator;
+
+        private CollectorUpdater(final CollectorCounterStore collectorCounterStore, final CounterDataStore aggregator) {
+            this.collectorCounterStore = collectorCounterStore;
+            this.aggregator = aggregator;
+        }
+
+        @Override
+        public void run() {
+            for (final Counter aggregate : aggregator.getCounters()) {
+                final M2AwareStatisticalSummary stats = new M2AwareStatisticalSummary(
+                    aggregate.getMean(), aggregate.getVariance(), aggregate.getHits(),
+                    aggregate.getMax(), aggregate.getMin(), aggregate.getSum(),
+                    aggregate.getSecondMoment()
+                );
+                collectorCounterStore.update(aggregate.getKey(), BOOMERANG_MARKER, stats, aggregate.getMaxConcurrency());
+            }
+        }
+    }
+
+    private static class InitializedCounterDataStore extends InMemoryCounterDataStore {
+        private final CollectorCounterStore delegate;
+
+        public InitializedCounterDataStore(final CollectorCounterStore collectorCounterStore) {
+            this.delegate = collectorCounterStore;
+        }
+
+        @Override
+        protected Counter newCounter(final Counter.Key key) {
+            return new InitializedDefaultCounter(key, this, delegate.getOrCreateCounter(key, BOOMERANG_MARKER));
+        }
+    }
+
+    private static class InitializedDefaultCounter extends DefaultCounter {
+        public InitializedDefaultCounter(final Key key, final InitializedCounterDataStore store, final Counter aggregate) {
+            super(key, store, new OptimizedStatistics(
+                aggregate.getHits(), aggregate.getSum(), aggregate.getMin(),
+                aggregate.getMax(), aggregate.getMean(), aggregate.getSecondMoment()
+            ));
+        }
+    }
 }

Added: incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java
URL: http://svn.apache.org/viewvc/incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java?rev=1566992&view=auto
==============================================================================
--- incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java (added)
+++ incubator/sirona/trunk/server/boomerang/src/test/java/org/apache/sirona/boomerang/BoomerangServletTest.java Tue Feb 11 06:37:32 2014
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.sirona.boomerang;
+
+import org.apache.sirona.counters.Counter;
+import org.apache.sirona.repositories.Repository;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.servlet.ServletConfig;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class BoomerangServletTest {
+    @Before
+    @After
+    public void init() {
+        Repository.INSTANCE.reset();
+    }
+
+    @Test
+    public void collect() throws ServletException, IOException {
+        final BoomerangServlet servlet = new BoomerangServlet();
+        final ClassLoader loader = Thread.currentThread().getContextClassLoader();
+
+        servlet.init(ServletConfig.class.cast(
+                Proxy.newProxyInstance(loader, new Class<?>[]{ServletConfig.class}, new InvocationHandler() {
+                    @Override
+                    public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+                        return null;
+                    }
+                })
+        ));
+        try {
+            servlet.service(
+                    HttpServletRequest.class.cast(
+                        Proxy.newProxyInstance(loader, new Class<?>[]{HttpServletRequest.class}, new InvocationHandler() {
+                            @Override
+                            public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+                                if ("getQueryString".equals(method.getName())) {
+                                    return "v=0.9&u=http%3A%2F%2Flocalhost%2Fboomerang%2Ftest.html&rt.start=navigation" +
+                                            "&rt.bstart=1391971905166&rt.end=1391971905170&t_done=146&t_resp=6" +
+                                            "&t_page=140&r=&t_other=boomerang%7C2%2Cboomr_fb%7C142&bw=NaN&bw_err=NaN" +
+                                            "&lat=19&lat_err=2.85&bw_time=1391971906";
+                                }
+                                return null;
+                            }
+                        })),
+                    HttpServletResponse.class.cast(
+                        Proxy.newProxyInstance(loader, new Class<?>[]{HttpServletResponse.class}, new InvocationHandler() {
+                            @Override
+                            public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+                                if ("getWriter".equals(method.getName())) {
+                                    return new PrintWriter(new ByteArrayOutputStream()); // don't care
+                                }
+                                return null;
+                            }
+                        })));
+        } finally {
+            servlet.destroy();
+        }
+
+        final Counter perceived = Repository.INSTANCE.getCounter(new Counter.Key(BoomerangServlet.BOOMERANG_PERCEIVED, "/boomerang/test.html"));
+        assertNotNull(perceived);
+        assertEquals(1, perceived.getHits());
+        assertEquals(146., perceived.getMax(), 0.);
+
+        final Counter latency = Repository.INSTANCE.getCounter(new Counter.Key(BoomerangServlet.BOOMERANG_LATENCY, "/boomerang/test.html"));
+        assertNotNull(latency);
+        assertEquals(1, latency.getHits());
+        assertEquals(19., latency.getMax(), 0.);
+    }
+}