You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/11 14:47:57 UTC

[1/2] usergrid git commit: Another counter concurrency fix, plus test stabilization changes

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue 9f2863fd6 -> 775257d27


Another counter concurrency fix, plus test stabilization changes


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/041109fb
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/041109fb
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/041109fb

Branch: refs/heads/usergrid-1318-queue
Commit: 041109fb16287d1e6194f29658d7445c8058fc90
Parents: 9f2863f
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Oct 11 08:55:12 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Oct 11 08:55:12 2016 -0400

----------------------------------------------------------------------
 .../impl/MessageCounterSerializationImpl.java   | 81 ++++++++++----------
 .../distributed/QueueActorServiceTest.java      | 17 ++--
 .../queue/src/test/resources/qakka.properties   |  3 +
 3 files changed, 53 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index 2eb482a..ee4bab2 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -123,25 +123,29 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
 
         synchronized ( inMemoryCounters ) {
 
-            if ( inMemoryCounters.get( key ) == null ) {
+            if (inMemoryCounters.get( key ) == null) {
 
                 Long value = retrieveCounterFromStorage( queueName, type );
 
-                if ( value == null ) {
+                if (value == null) {
                     incrementCounterInStorage( queueName, type, 0L );
-                    inMemoryCounters.put( key, new InMemoryCount( 0L ));
+                    inMemoryCounters.put( key, new InMemoryCount( 0L ) );
                 } else {
-                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                    inMemoryCounters.put( key, new InMemoryCount( value ) );
                 }
             }
+        }
+
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
-            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+        synchronized ( inMemoryCount ) {
             inMemoryCount.getIncrement().addAndGet( increment );
 
-//            logger.info("Incremented Count for queue {} type {} = {}",
-//                queueName, type, getCounterValue( queueName, type ));
+            //logger.info("Incremented Count for queue {} type {} = {}",
+            //queueName, type, getCounterValue( queueName, type ));
+
+            saveIfNeeded( queueName, type );
         }
-        saveIfNeeded( queueName, type );
     }
 
 
@@ -152,25 +156,30 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
 
         synchronized ( inMemoryCounters ) {
 
-            if ( inMemoryCounters.get( key ) == null ) {
+            if (inMemoryCounters.get( key ) == null) {
 
                 Long value = retrieveCounterFromStorage( queueName, type );
 
-                if ( value == null ) {
+                if (value == null) {
                     decrementCounterInStorage( queueName, type, 0L );
-                    inMemoryCounters.put( key, new InMemoryCount( 0L ));
+                    inMemoryCounters.put( key, new InMemoryCount( 0L ) );
                 } else {
-                    inMemoryCounters.put( key, new InMemoryCount( value ));
+                    inMemoryCounters.put( key, new InMemoryCount( value ) );
                 }
             }
+        }
+
+        InMemoryCount inMemoryCount = inMemoryCounters.get( key );
+
+        synchronized ( inMemoryCount ) {
 
-            InMemoryCount inMemoryCount = inMemoryCounters.get( key );
             inMemoryCount.getDecrement().addAndGet( decrement );
 
-//            logger.info("Decremented Count for queue {} type {} = {}",
-//                queueName, type, getCounterValue( queueName, type ));
+            //logger.info("Decremented Count for queue {} type {} = {}",
+                //queueName, type, getCounterValue( queueName, type ));
+
+            saveIfNeeded( queueName, type );
         }
-        saveIfNeeded( queueName, type );
     }
 
 
@@ -179,19 +188,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
 
         String key = buildKey( queueName, type );
 
-        synchronized ( inMemoryCounters ) {
+        if ( inMemoryCounters.get( key ) == null ) {
 
-            if ( inMemoryCounters.get( key ) == null ) {
+            Long value = retrieveCounterFromStorage( queueName, type );
 
-                Long value = retrieveCounterFromStorage( queueName, type );
-
-                if ( value == null ) {
-                    throw new NotFoundException(
-                            MessageFormat.format( "No counter found for queue {0} type {1}",
-                                    queueName, type ));
-                } else {
-                    inMemoryCounters.put( key, new InMemoryCount( value ));
-                }
+            if ( value == null ) {
+                throw new NotFoundException(
+                        MessageFormat.format( "No counter found for queue {0} type {1}",
+                                queueName, type ));
+            } else {
+                inMemoryCounters.put( key, new InMemoryCount( value ));
             }
         }
 
@@ -245,22 +251,19 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
 
         InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
-        synchronized ( inMemoryCount ) {
-
-            if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
+        if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
 
-                long totalIncrement = inMemoryCount.getIncrement().get();
-                incrementCounterInStorage( queueName, type, totalIncrement );
+            long totalIncrement = inMemoryCount.getIncrement().get();
+            incrementCounterInStorage( queueName, type, totalIncrement );
 
-                long totalDecrement = inMemoryCount.getDecrement().get();
-                decrementCounterInStorage( queueName, type, totalDecrement );
+            long totalDecrement = inMemoryCount.getDecrement().get();
+            decrementCounterInStorage( queueName, type, totalDecrement );
 
-                inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
-                inMemoryCount.getIncrement().set( 0L );
-                inMemoryCount.getDecrement().set( 0L );
+            inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
+            inMemoryCount.getIncrement().set( 0L );
+            inMemoryCount.getDecrement().set( 0L );
 
-                numChanges.set( 0 );
-            }
+            numChanges.set( 0 );
         }
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
index 7fe8b16..f5512e5 100644
--- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
+++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java
@@ -152,32 +152,31 @@ public class QueueActorServiceTest extends AbstractTest {
                     queueName, region, region, messageId, null, null );
             }
 
-            int maxRetries = 25;
+            int maxRetries = 10;
             int retries = 0;
-            int count = 0;
+            long count = 0;
             while (retries++ < maxRetries) {
                 distributedQueueService.refresh();
-                if ( queueMessageManager.getQueueDepth(  queueName ) == 100 ) {
-                    count = 100;
+                count = queueMessageManager.getQueueDepth(  queueName );
+                if ( count == 100 ) {
                     break;
                 }
-                count = inMemoryQueue.size( queueName );
                 Thread.sleep( 1000 );
             }
 
             Assert.assertEquals( 100, count );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-            Assert.assertEquals( 75, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 75, queueMessageManager.getQueueDepth(  queueName ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-            Assert.assertEquals( 50, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 50, queueMessageManager.getQueueDepth(  queueName ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-            Assert.assertEquals( 25, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 25, queueMessageManager.getQueueDepth(  queueName ) );
 
             Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
-            Assert.assertEquals( 0, inMemoryQueue.size( queueName ) );
+            Assert.assertEquals( 0,  queueMessageManager.getQueueDepth(  queueName ) );
 
             distributedQueueService.shutdown();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/041109fb/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 142138d..95b2509 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -40,6 +40,9 @@ queue.num.actors=50
 queue.sender.num.actors=100
 queue.writer.num.actors=100
 
+queue.send.timeout.seconds=5
+queue.get.timeout.seconds=5
+
 # set shard size and times low for testing purposes
 queue.shard.max.size=10
 queue.shard.allocation.check.frequency.millis=100


[2/2] usergrid git commit: Fix unique values code to use GuiceActorProducer correctly.

Posted by sn...@apache.org.
Fix unique values code to use GuiceActorProducer correctly.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/775257d2
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/775257d2
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/775257d2

Branch: refs/heads/usergrid-1318-queue
Commit: 775257d27bfd3a3354fc5da4803530636413c76d
Parents: 041109f
Author: Dave Johnson <sn...@apache.org>
Authored: Tue Oct 11 10:01:03 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Tue Oct 11 10:01:03 2016 -0400

----------------------------------------------------------------------
 .../collection/uniquevalues/UniqueValueActor.java        | 11 +++++------
 .../collection/uniquevalues/UniqueValuesRouter.java      |  8 ++++----
 .../collection/uniquevalues/UniqueValuesServiceImpl.java |  4 ++--
 3 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/775257d2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
index 74f45eb..93b6ddb 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValueActor.java
@@ -17,6 +17,7 @@
 package org.apache.usergrid.persistence.collection.uniquevalues;
 
 import akka.actor.UntypedActor;
+import com.google.inject.Inject;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
 import org.apache.usergrid.persistence.core.scope.ApplicationScope;
@@ -34,8 +35,6 @@ public class UniqueValueActor extends UntypedActor {
 
     private final String name = RandomStringUtils.randomAlphanumeric( 4 );
 
-    //private MetricsService metricsService;
-
     private final ActorSystemManager actorSystemManager;
 
     private final UniqueValuesTable table;
@@ -43,11 +42,11 @@ public class UniqueValueActor extends UntypedActor {
     private int count = 0;
 
 
-    public UniqueValueActor() {
+    @Inject
+    public UniqueValueActor( UniqueValuesTable table, ActorSystemManager actorSystemManager ) {
 
-        // TODO: is there a way to avoid this ugly kludge? see also: ClusterSingletonRouter
-        this.table = UniqueValuesServiceImpl.injector.getInstance( UniqueValuesTable.class );
-        this.actorSystemManager = UniqueValuesServiceImpl.injector.getInstance( ActorSystemManager.class );
+        this.table = table;
+        this.actorSystemManager = actorSystemManager;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/775257d2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
index 355320b..17cbbb5 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesRouter.java
@@ -24,6 +24,7 @@ import akka.routing.FromConfig;
 import com.google.inject.Inject;
 import com.google.inject.Injector;
 import org.apache.commons.lang.RandomStringUtils;
+import org.apache.usergrid.persistence.actorsystem.GuiceActorProducer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,10 +41,9 @@ public class UniqueValuesRouter extends UntypedActor {
     @Inject
     public UniqueValuesRouter() {
 
-        router = getContext().actorOf(
-            FromConfig.getInstance().props(
-                Props.create( UniqueValueActor.class)
-                    .withDispatcher("akka.blocking-io-dispatcher")), "router");
+        router = getContext().actorOf( FromConfig.getInstance().props(
+            Props.create( GuiceActorProducer.class, UniqueValueActor.class)
+                .withDispatcher("akka.blocking-io-dispatcher")), "router");
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/usergrid/blob/775257d2/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
index 4562998..47a5156 100644
--- a/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
+++ b/stack/corepersistence/collection/src/main/java/org/apache/usergrid/persistence/collection/uniquevalues/UniqueValuesServiceImpl.java
@@ -52,7 +52,6 @@ import java.util.concurrent.TimeUnit;
 public class UniqueValuesServiceImpl implements UniqueValuesService {
     private static final Logger logger = LoggerFactory.getLogger( UniqueValuesServiceImpl.class );
 
-    static Injector          injector;
     UniqueValuesFig          uniqueValuesFig;
     ActorSystemManager       actorSystemManager;
     UniqueValuesTable        table;
@@ -66,13 +65,14 @@ public class UniqueValuesServiceImpl implements UniqueValuesService {
         ActorSystemManager actorSystemManager,
         UniqueValuesTable table ) {
 
-        injector = inj;
         this.actorSystemManager = actorSystemManager;
         this.uniqueValuesFig = uniqueValuesFig;
         this.table = table;
 
         ReservationCache.init( uniqueValuesFig.getUniqueValueCacheTtl() );
         this.reservationCache = ReservationCache.getInstance();
+
+        GuiceActorProducer.INJECTOR = inj;
     }