You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by to...@apache.org on 2015/08/12 00:51:29 UTC
[1/2] incubator-usergrid git commit: Adds a simple 30 second flush
interval task
Repository: incubator-usergrid
Updated Branches:
refs/heads/USERGRID-907-2.0 [created] 13f6d2344
Adds a simple 30 second flush interval task
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/117ab5c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/117ab5c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/117ab5c5
Branch: refs/heads/USERGRID-907-2.0
Commit: 117ab5c598a7ecca44f8e552d9550ece57a51541
Parents: 9f81d7b
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 11 16:08:30 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 11 16:13:08 2015 -0600
----------------------------------------------------------------------
.../apache/usergrid/count/AbstractBatcher.java | 70 +++++++++++++++-----
.../main/resources/usergrid-core-context.xml | 1 +
.../apache/usergrid/persistence/CounterIT.java | 50 ++++++++++++++
3 files changed, 106 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117ab5c5/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
index cd2d2e9..e7dd439 100644
--- a/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
+++ b/stack/core/src/main/java/org/apache/usergrid/count/AbstractBatcher.java
@@ -21,12 +21,18 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.usergrid.count.common.Count;
import com.yammer.metrics.Metrics;
import com.yammer.metrics.core.Counter;
@@ -42,17 +48,31 @@ import com.yammer.metrics.core.TimerContext;
public abstract class AbstractBatcher implements Batcher {
protected BatchSubmitter batchSubmitter;
+ protected static final Logger logger = LoggerFactory.getLogger( AbstractBatcher.class );
+
private volatile Batch batch;
private final AtomicLong opCount = new AtomicLong();
private final Timer addTimer =
Metrics.newTimer( AbstractBatcher.class, "add_invocation", TimeUnit.MICROSECONDS, TimeUnit.SECONDS );
protected final Counter invocationCounter = Metrics.newCounter( AbstractBatcher.class, "batch_add_invocations" );
- private final Counter existingCounterHit = Metrics.newCounter( AbstractBatcher.class, "counter_existed" );
// TODO add batchCount, remove shouldSubmit, impl submit, change simpleBatcher to just be an extension
protected int batchSize = 500;
+ protected int batchIntervalSeconds = 10;
private final AtomicLong batchSubmissionCount = new AtomicLong();
- private final AtomicBoolean lock = new AtomicBoolean( false );
+ /**
+ * Create our scheduler to fire our execution
+ */
+ private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool( 1 );
+
+
+ /**
+ * Set the batch interval in seconds
+ * @param batchIntervalSeconds
+ */
+ public void setBatchInterval(int batchIntervalSeconds){
+ this.batchIntervalSeconds = batchIntervalSeconds;
+ }
public void setBatchSize( int batchSize ) {
this.batchSize = batchSize;
@@ -88,16 +108,23 @@ public abstract class AbstractBatcher implements Batcher {
}
- Batch getBatch() {
+ private Batch getBatch() {
Batch active = batch;
if ( active == null ) {
synchronized ( this ) {
active = batch;
if ( active == null ) {
batch = active = new Batch();
+
+ //now schedule our task for execution since we're creating a batch
+ scheduler.scheduleWithFixedDelay( new BatchFlusher(), this.batchIntervalSeconds,
+ this.batchIntervalSeconds, TimeUnit.SECONDS );
+
}
}
}
+
+ //we want to flush, and we have no capacity left, perform a flush
if ( batchSize > 1 && active.getCapacity() == 0 ) {
synchronized ( this ) {
if ( active.getCapacity() == 0 ) {
@@ -105,9 +132,29 @@ public abstract class AbstractBatcher implements Batcher {
}
}
}
+
return active;
}
+ private void flush(){
+ synchronized(this) {
+ getBatch().flush();
+ }
+ }
+
+
+ /**
+ * Runnable that will flush the batch every 30 seconds
+ */
+ private final class BatchFlusher implements Runnable {
+
+ @Override
+ public void run() {
+ //explicitly flush the batch
+ AbstractBatcher.this.flush();
+ }
+ }
+
public long getBatchSubmissionCount() {
return batchSubmissionCount.get();
@@ -118,11 +165,9 @@ public abstract class AbstractBatcher implements Batcher {
private BlockingQueue<Count> counts;
private final AtomicInteger localCallCounter = new AtomicInteger();
- private final AtomicBoolean lock = new AtomicBoolean( false );
-
Batch() {
- counts = new ArrayBlockingQueue<Count>( batchSize );
+ counts = new ArrayBlockingQueue<>( batchSize );
}
@@ -131,6 +176,8 @@ public abstract class AbstractBatcher implements Batcher {
}
+
+
void flush() {
ArrayList<Count> flushed = new ArrayList<Count>( batchSize );
counts.drainTo( flushed );
@@ -146,7 +193,7 @@ public abstract class AbstractBatcher implements Batcher {
counts.offer( count, 500, TimeUnit.MILLISECONDS );
}
catch ( Exception ex ) {
- ex.printStackTrace();
+ logger.error( "Unable to add count, dropping count {}", count, ex );
}
}
@@ -157,7 +204,7 @@ public abstract class AbstractBatcher implements Batcher {
f.get();
}
catch ( Exception ex ) {
- ex.printStackTrace();
+ logger.error( "Unable to add count, dropping count {}", count, ex );
}
batchSubmissionCount.incrementAndGet();
opCount.incrementAndGet();
@@ -165,12 +212,5 @@ public abstract class AbstractBatcher implements Batcher {
}
- /**
- * The number of times the {@link #add(org.apache.usergrid.count.common.Count)} method has been invoked on this batch
- * instance
- */
- public int getLocalCallCount() {
- return localCallCounter.get();
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117ab5c5/stack/core/src/main/resources/usergrid-core-context.xml
----------------------------------------------------------------------
diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml
index 5235daa..4ac0261 100644
--- a/stack/core/src/main/resources/usergrid-core-context.xml
+++ b/stack/core/src/main/resources/usergrid-core-context.xml
@@ -138,6 +138,7 @@
<bean id="simpleBatcher" class="org.apache.usergrid.count.SimpleBatcher">
<property name="batchSubmitter" ref="batchSubmitter"/>
+ <property name="batchInterval" value="${usergrid.counter.batch.interval}"/>
<property name="batchSize" value="${usergrid.counter.batch.size}"/>
</bean>
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/117ab5c5/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
index b16b943..079df0d 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
@@ -271,4 +271,54 @@ public class CounterIT extends AbstractCoreIT {
assertEquals( 1,
r.getCounters().get( 0 ).getValues().get( 0 ).getValue() - originalCount );
}
+
+
+
+
+ @Test
+ public void testTimedFlush() throws Exception {
+ LOG.info( "CounterIT.testCounters" );
+
+ EntityManager em = app.getEntityManager();
+
+
+ assertNotNull( em );
+
+
+ UUID user1 = UUID.randomUUID();
+ UUID user2 = UUID.randomUUID();
+ // UUID groupId = UUID.randomUUID();
+
+
+ Event event = null;
+
+ for ( int i = 0; i < 100; i++ ) {
+ event = new Event();
+ event.setTimestamp( ts + ( i * 60 * 1000 ) );
+ event.addCounter( "visits", 1 );
+ event.setUser( user1 );
+ em.create( event );
+
+ event = new Event();
+ event.setTimestamp( ts + ( i * 60 * 1000 ) );
+ event.addCounter( "visits", 1 );
+ event.setUser( user2 );
+ em.create( event );
+ }
+
+ //sleep to ensure the flush has executed
+ Thread.sleep( 30000 );
+
+ Results r = em.getAggregateCounters( null, null, null, "visits", CounterResolution.SIX_HOUR, ts, System.currentTimeMillis(), false );
+
+ final AggregateCounterSet counter = r.getCounters().get( 0 );
+
+ final long count = counter.getValues().get( 0 ).getValue();
+
+ final String name = counter.getName();
+
+ assertEquals("visits", name);
+ assertEquals(count, 200);
+
+ }
}
[2/2] incubator-usergrid git commit: Fixes incorrect removal of
default property
Posted by to...@apache.org.
Fixes incorrect removal of default property
Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/13f6d234
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/13f6d234
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/13f6d234
Branch: refs/heads/USERGRID-907-2.0
Commit: 13f6d2344e14b6840a52e2f5a04eceb0af185119
Parents: 117ab5c
Author: Todd Nine <tn...@apigee.com>
Authored: Tue Aug 11 16:21:13 2015 -0600
Committer: Todd Nine <tn...@apigee.com>
Committed: Tue Aug 11 16:36:37 2015 -0600
----------------------------------------------------------------------
stack/config/src/main/resources/usergrid-default.properties | 2 +-
.../src/test/java/org/apache/usergrid/persistence/CounterIT.java | 4 ++--
2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/13f6d234/stack/config/src/main/resources/usergrid-default.properties
----------------------------------------------------------------------
diff --git a/stack/config/src/main/resources/usergrid-default.properties b/stack/config/src/main/resources/usergrid-default.properties
index 3bcb821..33889ac 100644
--- a/stack/config/src/main/resources/usergrid-default.properties
+++ b/stack/config/src/main/resources/usergrid-default.properties
@@ -570,7 +570,7 @@ usergrid.counter.batch.size=1000
# Set the time interval for when to submit counter batches.
#
-#usergrid.counter.batch.interval=30
+usergrid.counter.batch.interval=30
# Set build number for display
# Note: ${version is obtained from the pom.xml <version>}
http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/13f6d234/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
index 079df0d..d1abde5 100644
--- a/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
+++ b/stack/core/src/test/java/org/apache/usergrid/persistence/CounterIT.java
@@ -290,7 +290,7 @@ public class CounterIT extends AbstractCoreIT {
// UUID groupId = UUID.randomUUID();
- Event event = null;
+ Event event;
for ( int i = 0; i < 100; i++ ) {
event = new Event();
@@ -318,7 +318,7 @@ public class CounterIT extends AbstractCoreIT {
final String name = counter.getName();
assertEquals("visits", name);
- assertEquals(count, 200);
+ assertEquals(200, count);
}
}