You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/11 14:47:57 UTC
[1/2] usergrid git commit: Another counter concurrency fix,
plus test stabilization changes
Repository: usergrid
Updated Branches:
refs/heads/usergrid-1318-queue 9f2863fd6 -> 775257d27
Another counter concurrency fix, plus test stabilization changes
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/041109fb
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/041109fb
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/041109fb
Branch: refs/heads/usergrid-1318-queue
Commit: 041109fb16287d1e6194f29658d7445c8058fc90
Parents: 9f2863f
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Oct 11 08:55:12 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Oct 11 08:55:12 2016 -0400
----------------------------------------------------------------------
.../impl/MessageCounterSerializationImpl.java | 81 ++++++++++----------
.../distributed/QueueActorServiceTest.java | 17 ++--
.../queue/src/test/resources/qakka.properties | 3 +
3 files changed, 53 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 2eb482a..ee4bab2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -123,25 +123,29 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
synchronized ( inMemoryCounters ) {
- if ( inMemoryCounters.get( key ) == null ) {
+ if (inMemoryCounters.get( key ) == null) {
Long value = retrieveCounterFromStorage( queueName, type );
- if ( value == null ) {
+ if (value == null) {
incrementCounterInStorage( queueName, type, 0L );
- inMemoryCounters.put( key, new InMemoryCount( 0L ));
+ inMemoryCounters.put( key, new InMemoryCount( 0L ) );
} else {
- inMemoryCounters.put( key, new InMemoryCount( value ));
+ inMemoryCounters.put( key, new InMemoryCount( value ) );
}
}
+ }
+
+ InMemoryCount inMemoryCount = inMemoryCounters.get( key );
- InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+ synchronized ( inMemoryCount ) {
inMemoryCount.getIncrement().addAndGet( increment );
-// logger.info("Incremented Count for queue {} type {} = {}",
-// queueName, type, getCounterValue( queueName, type ));
+ //logger.info("Incremented Count for queue {} type {} = {}",
+ //queueName, type, getCounterValue( queueName, type ));
+
+ saveIfNeeded( queueName, type );
}
- saveIfNeeded( queueName, type );
}
@@ -152,25 +156,30 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
synchronized ( inMemoryCounters ) {
- if ( inMemoryCounters.get( key ) == null ) {
+ if (inMemoryCounters.get( key ) == null) {
Long value = retrieveCounterFromStorage( queueName, type );
- if ( value == null ) {
+ if (value == null) {
decrementCounterInStorage( queueName, type, 0L );
- inMemoryCounters.put( key, new InMemoryCount( 0L ));
+ inMemoryCounters.put( key, new InMemoryCount( 0L ) );
} else {
- inMemoryCounters.put( key, new InMemoryCount( value ));
+ inMemoryCounters.put( key, new InMemoryCount( value ) );
}
}
+ }
+
+ InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+ synchronized ( inMemoryCount ) {
- InMemoryCount inMemoryCount = inMemoryCounters.get( key );
inMemoryCount.getDecrement().addAndGet( decrement );
-// logger.info("Decremented Count for queue {} type {} = {}",
-// queueName, type, getCounterValue( queueName, type ));
+ //logger.info("Decremented Count for queue {} type {} = {}",
+ //queueName, type, getCounterValue( queueName, type ));
+
+ saveIfNeeded( queueName, type );
}
- saveIfNeeded( queueName, type );
}
@@ -179,19 +188,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
String key = buildKey( queueName, type );
- synchronized ( inMemoryCounters ) {
+ if ( inMemoryCounters.get( key ) == null ) {
- if ( inMemoryCounters.get( key ) == null ) {
+ Long value = retrieveCounterFromStorage( queueName, type );
- Long value = retrieveCounterFromStorage( queueName, type );
-
- if ( value == null ) {
- throw new NotFoundException(
- MessageFormat.format( "No counter found for queue {0} type {1}",
- queueName, type ));
- } else {
- inMemoryCounters.put( key, new InMemoryCount( value ));
- }
+ if ( value == null ) {
+ throw new NotFoundException(
+ MessageFormat.format( "No counter found for queue {0} type {1}",
+ queueName, type ));
+ } else {
+ inMemoryCounters.put( key, new InMemoryCount( value ));
}
}
@@ -245,22 +251,19 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
InMemoryCount inMemoryCount = inMemoryCounters.get( key );
- synchronized ( inMemoryCount ) {
-
- if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
+ if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
- long totalIncrement = inMemoryCount.getIncrement().get();
- incrementCounterInStorage( queueName, type, totalIncrement );
+ long totalIncrement = inMemoryCount.getIncrement().get();
+ incrementCounterInStorage( queueName, type, totalIncrement );
- long totalDecrement = inMemoryCount.getDecrement().get();
- decrementCounterInStorage( queueName, type, totalDecrement );
+ long totalDecrement = inMemoryCount.getDecrement().get();
+ decrementCounterInStorage( queueName, type, totalDecrement );
- inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
- inMemoryCount.getIncrement().set( 0L );
- inMemoryCount.getDecrement().set( 0L );
+ inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
+ inMemoryCount.getIncrement().set( 0L );
+ inMemoryCount.getDecrement().set( 0L );
- numChanges.set( 0 );
- }
+ numChanges.set( 0 );
}
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 7fe8b16..f5512e5 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -152,32 +152,31 @@ public class QueueActorServiceTest extends AbstractTest {
queueName, region, region, messageId, null, null );
}
- int maxRetries = 25;
+ int maxRetries = 10;
int retries = 0;
- int count = 0;
+ long count = 0;
while (retries++ < maxRetries) {
distributedQueueService.refresh();
- if ( queueMessageManager.getQueueDepth( queueName ) == 100 ) {
- count = 100;
+ count = queueMessageManager.getQueueDepth( queueName );
+ if ( count == 100 ) {
break;
}
- count = inMemoryQueue.size( queueName );
Thread.sleep( 1000 );
}
Assert.assertEquals( 100, count );
Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 75, queueMessageManager.getQueueDepth( queueName ) );
Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 50, queueMessageManager.getQueueDepth( queueName ) );
Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 25, queueMessageManager.getQueueDepth( queueName ) );
Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
- Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+ Assert.assertEquals( 0, queueMessageManager.getQueueDepth( queueName ) );
distributedQueueService.shutdown();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 142138d..95b2509 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -40,6 +40,9 @@ queue.num.actors=50
queue.sender.num.actors=100
queue.writer.num.actors=100
+queue.send.timeout.seconds=5
+queue.get.timeout.seconds=5
+
# set shard size and times low for testing purposes
queue.shard.max.size=10
queue.shard.allocation.check.frequency.millis=100
[2/2] usergrid git commit: Fix unique values code to use
GuiceActorProducer correctly.
Posted by sn...@apache.org.
Fix unique values code to use GuiceActorProducer correctly.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/775257d2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/775257d2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/775257d2
Branch: refs/heads/usergrid-1318-queue
Commit: 775257d27bfd3a3354fc5da4803530636413c76d
Parents: 041109f
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Oct 11 10:01:03 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Oct 11 10:01:03 2016 -0400
----------------------------------------------------------------------
.../collection/uniquevalues/UniqueValueActor.java | 11 +++++------
.../collection/uniquevalues/UniqueValuesRouter.java | 8 ++++----
.../collection/uniquevalues/UniqueValuesServiceImpl.java | 4 ++--
3 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/775257d2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index 74f45eb..93b6ddb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -17,6 +17,7 @@
package org.apache.usergrid.persistence.collection.uniquevalues;
import akka.actor.UntypedActor;
+import com.google.inject.Inject;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -34,8 +35,6 @@ public class UniqueValueActor extends UntypedActor {
private final String name = RandomStringUtils.randomAlphanumeric( 4 );
- //private MetricsService metricsService;
-
private final ActorSystemManager actorSystemManager;
private final UniqueValuesTable table;
@@ -43,11 +42,11 @@ public class UniqueValueActor extends UntypedActor {
private int count = 0;
- public UniqueValueActor() {
+ @Inject
+ public UniqueValueActor( UniqueValuesTable table, ActorSystemManager actorSystemManager ) {
- // TODO: is there a way to avoid this ugly kludge? see also: ClusterSingletonRouter
- this.table = UniqueValuesServiceImpl.injector.getInstance( UniqueValuesTable.class );
- this.actorSystemManager = UniqueValuesServiceImpl.injector.getInstance( ActorSystemManager.class );
+ this.table = table;
+ this.actorSystemManager = actorSystemManager;
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/775257d2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
index 355320b..17cbbb5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
@@ -24,6 +24,7 @@ import akka.routing.FromConfig;
import com.google.inject.Inject;
import com.google.inject.Injector;
import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,10 +41,9 @@ public class UniqueValuesRouter extends UntypedActor {
@Inject
public UniqueValuesRouter() {
- router = getContext().actorOf(
- FromConfig.getInstance().props(
- Props.create( UniqueValueActor.class)
- .withDispatcher("akka.blocking-io-dispatcher")), "router");
+ router = getContext().actorOf( FromConfig.getInstance().props(
+ Props.create( GuiceActorProducer.class, UniqueValueActor.class)
+ .withDispatcher("akka.blocking-io-dispatcher")), "router");
}
@Override
http://git-wip-us.apache.org/repos/asf/usergrid/blob/775257d2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 4562998..47a5156 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit;
public class UniqueValuesServiceImpl implements UniqueValuesService {
private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class );
- static Injector injector;
UniqueValuesFig uniqueValuesFig;
ActorSystemManager actorSystemManager;
UniqueValuesTable table;
@@ -66,13 +65,14 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
ActorSystemManager actorSystemManager,
UniqueValuesTable table ) {
- injector = inj;
this.actorSystemManager = actorSystemManager;
this.uniqueValuesFig = uniqueValuesFig;
this.table = table;
ReservationCache.init( uniqueValuesFig.getUniqueValueCacheTtl() );
this.reservationCache = ReservationCache.getInstance();
+
+ GuiceActorProducer.INJECTOR = inj;
}