You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/12 00:08:31 UTC
incubator-usergrid git commit: Adds a simple 30 second flush interval
task
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-907 [created] 9bebb069b
Adds a simple 30 second flush interval task
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/9bebb069
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/9bebb069
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/9bebb069
Branch: refs/heads/USERGRID-907
Commit: 9bebb069bd7fef26dcc81205c0384588593df064
Parents: 566046b
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 11 16:08:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 11 16:08:30 2015 -0600
----------------------------------------------------------------------
.../apache/usergrid/count/AbstractBatcher.java | 70 +++++++++++++++-----
.../main/resources/usergrid-core-context.xml | 1 +
.../apache/usergrid/persistence/CounterIT.java | 50 ++++++++++++++
3 files changed, 106 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
index cd2d2e9..e7dd439 100644
--- a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
+++ b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
@@ -21,12 +21,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.count.common.Count;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
@@ -42,17 +48,31 @@ import com.yammer.metrics.core.TimerContext;
public abstract class AbstractBatcher implements Batcher {
protected BatchSubmitter batchSubmitter;
+ protected static final Logger logger = LoggerFactory.getLogger( AbstractBatcher.class );
+
private volatile Batch batch;
private final AtomicLong opCount = new AtomicLong();
private final Timer addTimer =
Metrics.newTimer( AbstractBatcher.class, "add_invocation", TimeUnit.MICROSECONDS, TimeUnit.SECONDS );
protected final Counter invocationCounter = Metrics.newCounter( AbstractBatcher.class, "batch_add_invocations" );
- private final Counter existingCounterHit = Metrics.newCounter( AbstractBatcher.class, "counter_existed" );
// TODO add batchCount, remove shouldSubmit, impl submit, change simpleBatcher to just be an extension
protected int batchSize = 500;
+ protected int batchIntervalSeconds = 10;
private final AtomicLong batchSubmissionCount = new AtomicLong();
- private final AtomicBoolean lock = new AtomicBoolean( false );
+ /**
+ * Create our scheduler to fire our execution
+ */
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );
+
+
+ /**
+ * Set the batch interval in seconds
+ * @param batchIntervalSeconds
+ */
+ public void setBatchInterval(int batchIntervalSeconds){
+ this.batchIntervalSeconds = batchIntervalSeconds;
+ }
public void setBatchSize( int batchSize ) {
this.batchSize = batchSize;
@@ -88,16 +108,23 @@ public abstract class AbstractBatcher implements Batcher {
}
- Batch getBatch() {
+ private Batch getBatch() {
Batch active = batch;
if ( active == null ) {
synchronized ( this ) {
active = batch;
if ( active == null ) {
batch = active = new Batch();
+
+ //now schedule our task for execution since we're creating a batch
+ scheduler.scheduleWithFixedDelay( new BatchFlusher(), this.batchIntervalSeconds,
+ this.batchIntervalSeconds, TimeUnit.SECONDS );
+
}
}
}
+
+ //we want to flush, and we have no capacity left, perform a flush
if ( batchSize > 1 && active.getCapacity() == 0 ) {
synchronized ( this ) {
if ( active.getCapacity() == 0 ) {
@@ -105,9 +132,29 @@ public abstract class AbstractBatcher implements Batcher {
}
}
}
+
return active;
}
+ private void flush(){
+ synchronized(this) {
+ getBatch().flush();
+ }
+ }
+
+
+ /**
+ * Runnable that will flush the batch every 30 seconds
+ */
+ private final class BatchFlusher implements Runnable {
+
+ @Override
+ public void run() {
+ //explicitly flush the batch
+ AbstractBatcher.this.flush();
+ }
+ }
+
public long getBatchSubmissionCount() {
return batchSubmissionCount.get();
@@ -118,11 +165,9 @@ public abstract class AbstractBatcher implements Batcher {
private BlockingQueue<Count> counts;
private final AtomicInteger localCallCounter = new AtomicInteger();
- private final AtomicBoolean lock = new AtomicBoolean( false );
-
Batch() {
- counts = new ArrayBlockingQueue<Count>( batchSize );
+ counts = new ArrayBlockingQueue<>( batchSize );
}
@@ -131,6 +176,8 @@ public abstract class AbstractBatcher implements Batcher {
}
+
+
void flush() {
ArrayList<Count> flushed = new ArrayList<Count>( batchSize );
counts.drainTo( flushed );
@@ -146,7 +193,7 @@ public abstract class AbstractBatcher implements Batcher {
counts.offer( count, 500, TimeUnit.MILLISECONDS );
}
catch ( Exception ex ) {
- ex.printStackTrace();
+ logger.error( "Unable to add count, dropping count {}", count, ex );
}
}
@@ -157,7 +204,7 @@ public abstract class AbstractBatcher implements Batcher {
f.get();
}
catch ( Exception ex ) {
- ex.printStackTrace();
+ logger.error( "Unable to add count, dropping count {}", count, ex );
}
batchSubmissionCount.incrementAndGet();
opCount.incrementAndGet();
@@ -165,12 +212,5 @@ public abstract class AbstractBatcher implements Batcher {
}
- /**
- * The number of times the {@link #add(org.apache.usergrid.count.common.Count)} method has been invoked on this batch
- * instance
- */
- public int getLocalCallCount() {
- return localCallCounter.get();
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/main/resources/usergrid-core-context.xml
----------------------------------------------------------------------
diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml
index 53ae4e2..f3eb482 100644
--- a/stack/core/src/main/resources/usergrid-core-context.xml
+++ b/stack/core/src/main/resources/usergrid-core-context.xml
@@ -134,6 +134,7 @@
<bean id="simpleBatcher" class="org.apache.usergrid.count.SimpleBatcher">
<property name="batchSubmitter" ref="batchSubmitter"/>
+ <property name="batchInterval" value="${usergrid.counter.batch.interval}"/>
<property name="batchSize" value="${usergrid.counter.batch.size}"/>
</bean>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/9bebb069/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
index a394a79..2c72d26 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
@@ -263,4 +263,54 @@ public class CounterIT extends AbstractCoreIT {
assertEquals( 1,
r.getCounters().get( 0 ).getValues().get( 0 ).getValue() - originalCount );
}
+
+
+
+
+ @Test
+ public void testTimedFlush() throws Exception {
+ LOG.info( "CounterIT.testCounters" );
+
+ EntityManager em = app.getEntityManager();
+
+
+ assertNotNull( em );
+
+
+ UUID user1 = UUID.randomUUID();
+ UUID user2 = UUID.randomUUID();
+ // UUID groupId = UUID.randomUUID();
+
+
+ Event event = null;
+
+ for ( int i = 0; i < 100; i++ ) {
+ event = new Event();
+ event.setTimestamp( ts + ( i * 60 * 1000 ) );
+ event.addCounter( "visits", 1 );
+ event.setUser( user1 );
+ em.create( event );
+
+ event = new Event();
+ event.setTimestamp( ts + ( i * 60 * 1000 ) );
+ event.addCounter( "visits", 1 );
+ event.setUser( user2 );
+ em.create( event );
+ }
+
+ //sleep to ensure the flush has executed
+ Thread.sleep( 30000 );
+
+ Results r = em.getAggregateCounters( null, null, null, "visits", CounterResolution.SIX_HOUR, ts, System.currentTimeMillis(), false );
+
+ final AggregateCounterSet counter = r.getCounters().get( 0 );
+
+ final long count = counter.getValues().get( 0 ).getValue();
+
+ final String name = counter.getName();
+
+ assertEquals("visits", name);
+ assertEquals(count, 200);
+
+ }
}