You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/12 00:51:29 UTC

[1/2] incubator-usergrid git commit: Adds a simple 30 second flush interval task

Repository: incubator-usergrid
Updated Branches:
  refs/heads/USERGRID-907-2.0 [created] 13f6d2344


Adds a simple 30 second flush interval task


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/117ab5c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/117ab5c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/117ab5c5

Branch: refs/heads/USERGRID-907-2.0
Commit: 117ab5c598a7ecca44f8e552d9550ece57a51541
Parents: 9f81d7b
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 11 16:08:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 11 16:13:08 2015 -0600

----------------------------------------------------------------------
 .../apache/usergrid/count/AbstractBatcher.java  | 70 +++++++++++++++-----
 .../main/resources/usergrid-core-context.xml    |  1 +
 .../apache/usergrid/persistence/CounterIT.java  | 50 ++++++++++++++
 3 files changed, 106 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117ab5c5/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
index cd2d2e9..e7dd439 100644
--- a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
+++ b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
@@ -21,12 +21,18 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.usergrid.count.common.Count;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Counter;
@@ -42,17 +48,31 @@ import com.yammer.metrics.core.TimerContext;
 public abstract class AbstractBatcher implements Batcher {
     protected BatchSubmitter batchSubmitter;
 
+    protected static final Logger logger = LoggerFactory.getLogger( AbstractBatcher.class );
+
     private volatile Batch batch;
     private final AtomicLong opCount = new AtomicLong();
     private final Timer addTimer =
             Metrics.newTimer( AbstractBatcher.class, "add_invocation", TimeUnit.MICROSECONDS, TimeUnit.SECONDS );
     protected final Counter invocationCounter = Metrics.newCounter( AbstractBatcher.class, "batch_add_invocations" );
-    private final Counter existingCounterHit = Metrics.newCounter( AbstractBatcher.class, "counter_existed" );
     // TODO add batchCount, remove shouldSubmit, impl submit, change simpleBatcher to just be an extension
     protected int batchSize = 500;
+    protected int batchIntervalSeconds = 10;
     private final AtomicLong batchSubmissionCount = new AtomicLong();
-    private final AtomicBoolean lock = new AtomicBoolean( false );
 
+    /**
+     * Create our scheduler to fire our execution
+     */
+    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );
+
+
+    /**
+     * Set the batch interval in seconds
+     * @param batchIntervalSeconds
+     */
+    public void setBatchInterval(int batchIntervalSeconds){
+       this.batchIntervalSeconds  = batchIntervalSeconds;
+    }
 
     public void setBatchSize( int batchSize ) {
         this.batchSize = batchSize;
@@ -88,16 +108,23 @@ public abstract class AbstractBatcher implements Batcher {
     }
 
 
-    Batch getBatch() {
+    private Batch getBatch() {
         Batch active = batch;
         if ( active == null ) {
             synchronized ( this ) {
                 active = batch;
                 if ( active == null ) {
                     batch = active = new Batch();
+
+                    //now schedule our task for execution since we're creating a batch
+                    scheduler.scheduleWithFixedDelay( new BatchFlusher(), this.batchIntervalSeconds,
+                        this.batchIntervalSeconds, TimeUnit.SECONDS );
+
                 }
             }
         }
+
+        //we want to flush, and we have no capacity left, perform a flush
         if ( batchSize > 1 && active.getCapacity() == 0 ) {
             synchronized ( this ) {
                 if ( active.getCapacity() == 0 ) {
@@ -105,9 +132,29 @@ public abstract class AbstractBatcher implements Batcher {
                 }
             }
         }
+
         return active;
     }
 
+    private void flush(){
+        synchronized(this) {
+            getBatch().flush();
+        }
+    }
+
+
+    /**
+     * Runnable that will flush the batch every 30 seconds
+     */
+    private final class BatchFlusher implements Runnable {
+
+        @Override
+        public void run() {
+            //explicitly flush the batch
+            AbstractBatcher.this.flush();
+        }
+    }
+
 
     public long getBatchSubmissionCount() {
         return batchSubmissionCount.get();
@@ -118,11 +165,9 @@ public abstract class AbstractBatcher implements Batcher {
         private BlockingQueue<Count> counts;
         private final AtomicInteger localCallCounter = new AtomicInteger();
 
-        private final AtomicBoolean lock = new AtomicBoolean( false );
-
 
         Batch() {
-            counts = new ArrayBlockingQueue<Count>( batchSize );
+            counts = new ArrayBlockingQueue<>( batchSize );
         }
 
 
@@ -131,6 +176,8 @@ public abstract class AbstractBatcher implements Batcher {
         }
 
 
+
+
         void flush() {
             ArrayList<Count> flushed = new ArrayList<Count>( batchSize );
             counts.drainTo( flushed );
@@ -146,7 +193,7 @@ public abstract class AbstractBatcher implements Batcher {
                 counts.offer( count, 500, TimeUnit.MILLISECONDS );
             }
             catch ( Exception ex ) {
-                ex.printStackTrace();
+                logger.error( "Unable to add count, dropping count {}", count, ex );
             }
         }
 
@@ -157,7 +204,7 @@ public abstract class AbstractBatcher implements Batcher {
                 f.get();
             }
             catch ( Exception ex ) {
-                ex.printStackTrace();
+                logger.error( "Unable to add count, dropping count {}", count, ex );
             }
             batchSubmissionCount.incrementAndGet();
             opCount.incrementAndGet();
@@ -165,12 +212,5 @@ public abstract class AbstractBatcher implements Batcher {
         }
 
 
-        /**
-         * The number of times the {@link #add(org.apache.usergrid.count.common.Count)} method has been invoked on this batch
-         * instance
-         */
-        public int getLocalCallCount() {
-            return localCallCounter.get();
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117ab5c5/stack/core/src/main/resources/usergrid-core-context.xml
----------------------------------------------------------------------
diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml
index 5235daa..4ac0261 100644
--- a/stack/core/src/main/resources/usergrid-core-context.xml
+++ b/stack/core/src/main/resources/usergrid-core-context.xml
@@ -138,6 +138,7 @@
 
     <bean id="simpleBatcher" class="org.apache.usergrid.count.SimpleBatcher">
         <property name="batchSubmitter" ref="batchSubmitter"/>
+        <property name="batchInterval" value="${usergrid.counter.batch.interval}"/>
         <property name="batchSize" value="${usergrid.counter.batch.size}"/>
     </bean>
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117ab5c5/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
index b16b943..079df0d 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
@@ -271,4 +271,54 @@ public class CounterIT extends AbstractCoreIT {
         assertEquals( 1,
             r.getCounters().get( 0 ).getValues().get( 0 ).getValue() - originalCount );
     }
+
+
+
+
+    @Test
+    public void testTimedFlush() throws Exception {
+        LOG.info( "CounterIT.testCounters" );
+
+        EntityManager em = app.getEntityManager();
+
+
+        assertNotNull( em );
+
+
+        UUID user1 = UUID.randomUUID();
+        UUID user2 = UUID.randomUUID();
+        // UUID groupId = UUID.randomUUID();
+
+
+        Event event = null;
+
+        for ( int i = 0; i < 100; i++ ) {
+            event = new Event();
+            event.setTimestamp( ts + ( i * 60 * 1000 ) );
+            event.addCounter( "visits", 1 );
+            event.setUser( user1 );
+            em.create( event );
+
+            event = new Event();
+            event.setTimestamp( ts + ( i * 60 * 1000 ) );
+            event.addCounter( "visits", 1 );
+            event.setUser( user2 );
+            em.create( event );
+        }
+
+        //sleep to ensure the flush has executed
+        Thread.sleep( 30000 );
+
+        Results r = em.getAggregateCounters( null, null, null, "visits", CounterResolution.SIX_HOUR, ts, System.currentTimeMillis(), false );
+
+        final AggregateCounterSet counter = r.getCounters().get( 0 );
+
+        final long count = counter.getValues().get( 0 ).getValue();
+
+        final String name = counter.getName();
+
+        assertEquals("visits", name);
+        assertEquals(count, 200);
+
+    }
 }


[2/2] incubator-usergrid git commit: Fixes incorrect removal of default property

Posted by to...@apache.org.
Fixes incorrect removal of default property


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/13f6d234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/13f6d234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/13f6d234

Branch: refs/heads/USERGRID-907-2.0
Commit: 13f6d2344e14b6840a52e2f5a04eceb0af185119
Parents: 117ab5c
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 11 16:21:13 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 11 16:36:37 2015 -0600

----------------------------------------------------------------------
 stack/config/src/main/resources/usergrid-default.properties      | 2 +-
 .../src/test/java/org/apache/usergrid/persistence/CounterIT.java | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/13f6d234/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 3bcb821..33889ac 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -570,7 +570,7 @@ usergrid.counter.batch.size=1000
 
 # Set the time interval for when to submit counter batches.
 #
-#usergrid.counter.batch.interval=30
+usergrid.counter.batch.interval=30
 
 # Set build number for display
 # Note: ${version is obtained from the pom.xml <version>}

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/13f6d234/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
index 079df0d..d1abde5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
@@ -290,7 +290,7 @@ public class CounterIT extends AbstractCoreIT {
         // UUID groupId = UUID.randomUUID();
 
 
-        Event event = null;
+        Event event;
 
         for ( int i = 0; i < 100; i++ ) {
             event = new Event();
@@ -318,7 +318,7 @@ public class CounterIT extends AbstractCoreIT {
         final String name = counter.getName();
 
         assertEquals("visits", name);
-        assertEquals(count, 200);
+        assertEquals(200, count);
 
     }
 }