You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/12 00:08:31 UTC

incubator-usergrid git commit: Adds a simple 30 second flush interval task

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-907 [created] 9bebb069b


Adds a simple 30 second flush interval task


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9bebb069
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9bebb069
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9bebb069

Branch: refs/heads/USERGRID-907
Commit: 9bebb069bd7fef26dcc81205c0384588593df064
Parents: 566046b
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 11 16:08:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 11 16:08:30 2015 -0600

----------------------------------------------------------------------
 .../apache/usergrid/count/AbstractBatcher.java  | 70 +++++++++++++++-----
 .../main/resources/usergrid-core-context.xml    |  1 +
 .../apache/usergrid/persistence/CounterIT.java  | 50 ++++++++++++++
 3 files changed, 106 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
index cd2d2e9..e7dd439 100644
--- a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
+++ b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
@@ -21,12 +21,18 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.count.common.Count;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Counter;
@@ -42,17 +48,31 @@ import com.yammer.metrics.core.TimerContext;
 public abstract class AbstractBatcher implements Batcher {
     protected BatchSubmitter batchSubmitter;
 
+    protected static final Logger logger = LoggerFactory.getLogger( AbstractBatcher.class );
+
     private volatile Batch batch;
     private final AtomicLong opCount = new AtomicLong();
     private final Timer addTimer =
             Metrics.newTimer( AbstractBatcher.class, "add_invocation", TimeUnit.MICROSECONDS, TimeUnit.SECONDS );
     protected final Counter invocationCounter = Metrics.newCounter( AbstractBatcher.class, "batch_add_invocations" );
-    private final Counter existingCounterHit = Metrics.newCounter( AbstractBatcher.class, "counter_existed" );
     // TODO add batchCount, remove shouldSubmit, impl submit, change simpleBatcher to just be an extension
     protected int batchSize = 500;
+    protected int batchIntervalSeconds = 10;
     private final AtomicLong batchSubmissionCount = new AtomicLong();
-    private final AtomicBoolean lock = new AtomicBoolean( false );
 
+    /**
+     * Create our scheduler to fire our execution
+     */
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );
+
+
+    /**
+     * Set the batch interval in seconds
+     * @param batchIntervalSeconds
+     */
+    public void setBatchInterval(int batchIntervalSeconds){
+       this.batchIntervalSeconds  = batchIntervalSeconds;
+    }
 
     public void setBatchSize( int batchSize ) {
         this.batchSize = batchSize;
@@ -88,16 +108,23 @@ public abstract class AbstractBatcher implements Batcher {
     }
 
 
-    Batch getBatch() {
+    private Batch getBatch() {
         Batch active = batch;
         if ( active == null ) {
             synchronized ( this ) {
                 active = batch;
                 if ( active == null ) {
                     batch = active = new Batch();
+
+                    //now schedule our task for execution since we're creating a batch
+                    scheduler.scheduleWithFixedDelay( new BatchFlusher(), this.batchIntervalSeconds,
+                        this.batchIntervalSeconds, TimeUnit.SECONDS );
+
                 }
             }
         }
+
+        //we want to flush, and we have no capacity left, perform a flush
         if ( batchSize > 1 && active.getCapacity() == 0 ) {
             synchronized ( this ) {
                 if ( active.getCapacity() == 0 ) {
@@ -105,9 +132,29 @@ public abstract class AbstractBatcher implements Batcher {
                 }
             }
         }
+
         return active;
     }
 
+    private void flush(){
+        synchronized(this) {
+            getBatch().flush();
+        }
+    }
+
+
+    /**
+     * Runnable that will flush the batch every 30 seconds
+     */
+    private final class BatchFlusher implements Runnable {
+
+        @Override
+        public void run() {
+            //explicitly flush the batch
+            AbstractBatcher.this.flush();
+        }
+    }
+
 
     public long getBatchSubmissionCount() {
         return batchSubmissionCount.get();
@@ -118,11 +165,9 @@ public abstract class AbstractBatcher implements Batcher {
         private BlockingQueue<Count> counts;
         private final AtomicInteger localCallCounter = new AtomicInteger();
 
-        private final AtomicBoolean lock = new AtomicBoolean( false );
-
 
         Batch() {
-            counts = new ArrayBlockingQueue<Count>( batchSize );
+            counts = new ArrayBlockingQueue<>( batchSize );
         }
 
 
@@ -131,6 +176,8 @@ public abstract class AbstractBatcher implements Batcher {
         }
 
 
+
+
         void flush() {
             ArrayList<Count> flushed = new ArrayList<Count>( batchSize );
             counts.drainTo( flushed );
@@ -146,7 +193,7 @@ public abstract class AbstractBatcher implements Batcher {
                 counts.offer( count, 500, TimeUnit.MILLISECONDS );
             }
             catch ( Exception ex ) {
-                ex.printStackTrace();
+                logger.error( "Unable to add count, dropping count {}", count, ex );
             }
         }
 
@@ -157,7 +204,7 @@ public abstract class AbstractBatcher implements Batcher {
                 f.get();
             }
             catch ( Exception ex ) {
-                ex.printStackTrace();
+                logger.error( "Unable to add count, dropping count {}", count, ex );
             }
             batchSubmissionCount.incrementAndGet();
             opCount.incrementAndGet();
@@ -165,12 +212,5 @@ public abstract class AbstractBatcher implements Batcher {
         }
 
 
-        /**
-         * The number of times the {@link #add(org.apache.usergrid.count.common.Count)} method has been invoked on this batch
-         * instance
-         */
-        public int getLocalCallCount() {
-            return localCallCounter.get();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/main/resources/usergrid-core-context.xml
----------------------------------------------------------------------
diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml
index 53ae4e2..f3eb482 100644
--- a/stack/core/src/main/resources/usergrid-core-context.xml
+++ b/stack/core/src/main/resources/usergrid-core-context.xml
@@ -134,6 +134,7 @@
 
     <bean id="simpleBatcher" class="org.apache.usergrid.count.SimpleBatcher">
         <property name="batchSubmitter" ref="batchSubmitter"/>
+        <property name="batchInterval" value="${usergrid.counter.batch.interval}"/>
         <property name="batchSize" value="${usergrid.counter.batch.size}"/>
     </bean>
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
index a394a79..2c72d26 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
@@ -263,4 +263,54 @@ public class CounterIT extends AbstractCoreIT {
         assertEquals( 1,
             r.getCounters().get( 0 ).getValues().get( 0 ).getValue() - originalCount );
     }
+
+
+
+
+    @Test
+    public void testTimedFlush() throws Exception {
+        LOG.info( "CounterIT.testCounters" );
+
+        EntityManager em = app.getEntityManager();
+
+
+        assertNotNull( em );
+
+
+        UUID user1 = UUID.randomUUID();
+        UUID user2 = UUID.randomUUID();
+        // UUID groupId = UUID.randomUUID();
+
+
+        Event event = null;
+
+        for ( int i = 0; i < 100; i++ ) {
+            event = new Event();
+            event.setTimestamp( ts + ( i * 60 * 1000 ) );
+            event.addCounter( "visits", 1 );
+            event.setUser( user1 );
+            em.create( event );
+
+            event = new Event();
+            event.setTimestamp( ts + ( i * 60 * 1000 ) );
+            event.addCounter( "visits", 1 );
+            event.setUser( user2 );
+            em.create( event );
+        }
+
+        //sleep to ensure the flush has executed
+        Thread.sleep( 30000 );
+
+        Results r = em.getAggregateCounters( null, null, null, "visits", CounterResolution.SIX_HOUR, ts, System.currentTimeMillis(), false );
+
+        final AggregateCounterSet counter = r.getCounters().get( 0 );
+
+        final long count = counter.getValues().get( 0 ).getValue();
+
+        final String name = counter.getName();
+
+        assertEquals("visits", name);
+        assertEquals(count, 200);
+
+    }
 }