You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/31 15:46:15 UTC

[20/37] usergrid git commit: Minor refactoring, renaming and debug logging changes.

Minor refactoring, renaming and debug logging changes.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c3a2dd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c3a2dd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c3a2dd

Branch: refs/heads/usergrid-1318-queue
Commit: f8c3a2dd87495cd4f65a455f46a187d9c7badd27
Parents: f56e1b0
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 13 10:12:33 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 13 10:12:33 2016 -0400

----------------------------------------------------------------------
 .../asyncevents/AsyncEventsSchedulerFig.java    |  9 +--
 .../usergrid/persistence/qakka/QakkaFig.java    | 12 ++--
 .../distributed/actors/ShardAllocator.java      |  5 ++
 .../impl/DistributedQueueServiceImpl.java       |  6 +-
 .../impl/MessageCounterSerializationImpl.java   | 72 +++++++++++++-------
 .../impl/QueueMessageSerializationImpl.java     |  2 +-
 .../impl/ShardCounterSerializationImpl.java     |  2 +-
 .../queue/src/test/resources/log4j.properties   |  5 +-
 .../queue/src/test/resources/qakka.properties   |  5 +-
 9 files changed, 70 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
index f696568..e556870 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
@@ -24,13 +24,9 @@ import org.safehaus.guicyfig.GuicyFig;
 import org.safehaus.guicyfig.Key;
 
 
-/**
- *
- */
 @FigSingleton
 public interface AsyncEventsSchedulerFig extends GuicyFig {
 
-
     /**
      * Amount of threads to use in async processing
      */
@@ -42,25 +38,22 @@ public interface AsyncEventsSchedulerFig extends GuicyFig {
      */
     String IO_SCHEDULER_NAME = "scheduler.io.poolName";
 
-
     /**
      * Amount of threads to use in async processing
      */
     String REPAIR_SCHEDULER_THREADS = "repair.io.threads";
 
-
     /**
      * Name of pool to use when performing scheduling
      */
     String REPAIR_SCHEDULER_NAME = "repair.io.poolName";
 
 
-
     @Default( "40" )
     @Key( IO_SCHEDULER_THREADS )
     int getMaxIoThreads();
 
-    @Default( "Usergrid-SQS-Pool" )
+    @Default( "Usergrid-Queue-Worker-Pool" )
     @Key( IO_SCHEDULER_NAME )
     String getIoSchedulerName();
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 3093c39..e6a8667 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -52,9 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
 
     String QUEUE_GET_TIMEOUT                      = "queue.get.timeout.seconds";
 
-    String QUEUE_MAX_SHARD_COUNTER                = "queue.max.inmemory.max.shard.counter";
+    String QUEUE_SHARD_COUNTER_MAX_IN_MEMORY      = "queue.shard.counter.max-in-memory";
 
-    String QUEUE_MAX_MESSAGE_CHANGES              = "queue.max.inmemory.max.message.changes";
+    String QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY    = "queue.message.counter.max-in-memory";
 
     String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis";
 
@@ -123,14 +123,14 @@ public interface QakkaFig extends GuicyFig, Serializable {
     int getSendTimeoutSeconds();
 
     /** Once counter reaches this value, write it to permanent storage */
-    @Key(QUEUE_MAX_SHARD_COUNTER)
+    @Key(QUEUE_SHARD_COUNTER_MAX_IN_MEMORY)
     @Default("100")
-    long getMaxInMemoryShardCounter();
+    long getShardCounterMaxInMemory();
 
     /** Once counter reaches this value, write it to permanent storage */
-    @Key(QUEUE_MAX_MESSAGE_CHANGES)
+    @Key(QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY)
     @Default("100")
-    long getMaxInMemoryMessageCounter();
+    long getMessageCounterMaxInMemory();
 
     /** How often to check whether new shard is needed for each queue */
     @Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 1863472..19059e6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -125,6 +125,7 @@ public class ShardAllocator extends UntypedActor {
             long counterValue = 0;
             try {
                 counterValue = shardCounterSerialization.getCounterValue( queueName, type, shard.getShardId() );
+
             } catch ( NotFoundException ignored ) {}
 
             if (counterValue > (0.9 * qakkaFig.getMaxShardSize())) {
@@ -140,6 +141,10 @@ public class ShardAllocator extends UntypedActor {
 
                 logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
                         this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );
+
+            } else {
+//                logger.debug("No new shard for queue {} counterValue {} of max {}",
+//                    queueName, counterValue, qakkaFig.getMaxShardSize() );
             }
 
         } catch ( Throwable t ) {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 20bf608..e2c5c2c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -241,9 +241,9 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                 }
             }
 
-            if ( ret.isEmpty() ) {
-                logger.info( "Requested {} but queue '{}' is empty", count, queueName);
-            }
+//            if ( ret.isEmpty() ) {
+//                logger.info( "Requested {} but queue '{}' is empty", count, queueName);
+//            }
             return ret;
 
         } finally {

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index ee4bab2..36175c5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -78,19 +78,35 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
 
     class InMemoryCount {
         long baseCount;
+        final AtomicLong totalInMemoryCount = new AtomicLong( 0L ); // for testing using only in-memory counter
         final AtomicLong increment = new AtomicLong( 0L );
         final AtomicLong decrement = new AtomicLong( 0L );
 
         InMemoryCount( long baseCount ) {
             this.baseCount = baseCount;
         }
-        public AtomicLong getIncrement() {
-            return increment;
+        public void increment( long inc ) {
+            increment.addAndGet( inc );
+            totalInMemoryCount.addAndGet( inc );
         }
-        public AtomicLong getDecrement() {
-            return decrement;
+        public void decrement( long dec ) {
+            decrement.addAndGet( dec );
+            totalInMemoryCount.addAndGet( -dec );
+        }
+        public long getIncrement() {
+            return increment.get();
+        }
+        public long getDecrement() {
+            return decrement.get();
+        }
+        public void clearDeltas() {
+            increment.set( 0L );
+            decrement.set( 0L );
         }
         public long value() {
+
+            // return totalInMemoryCount.get(); // for testing using just in-memory counter:
+
             return baseCount + increment.get() - decrement.get();
         }
         void setBaseCount( long baseCount ) {
@@ -106,7 +122,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
         CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
 
         this.cassandraConfig = cassandraConfig;
-        this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter();
+        this.maxChangesBeforeSave = qakkaFig.getMessageCounterMaxInMemory();
         this.cassandraClient = cassandraClient;
     }
 
@@ -139,13 +155,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
         InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
         synchronized ( inMemoryCount ) {
-            inMemoryCount.getIncrement().addAndGet( increment );
-
-            //logger.info("Incremented Count for queue {} type {} = {}",
-            //queueName, type, getCounterValue( queueName, type ));
-
+            inMemoryCount.increment( increment );
             saveIfNeeded( queueName, type );
         }
+
+        if ( logger.isDebugEnabled() ) {
+            long value = inMemoryCounters.get( key ).value();
+            if (value <= 0) {
+                logger.debug( "Queue {} type {} decremented count = {}", queueName, type, value );
+            }
+        }
     }
 
 
@@ -172,14 +191,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
         InMemoryCount inMemoryCount = inMemoryCounters.get( key );
 
         synchronized ( inMemoryCount ) {
-
-            inMemoryCount.getDecrement().addAndGet( decrement );
-
-            //logger.info("Decremented Count for queue {} type {} = {}",
-                //queueName, type, getCounterValue( queueName, type ));
-
+            inMemoryCount.decrement( decrement );
             saveIfNeeded( queueName, type );
         }
+
+        if ( logger.isDebugEnabled() ) {
+            long value = inMemoryCounters.get( key ).value();
+            if (value <= 0) {
+                logger.debug( "Queue {} type {} incremented count = {}", queueName, type, value );
+            }
+        }
     }
 
 
@@ -194,14 +215,14 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
 
             if ( value == null ) {
                 throw new NotFoundException(
-                        MessageFormat.format( "No counter found for queue {0} type {1}",
-                                queueName, type ));
+                        MessageFormat.format( "No counter found for queue {0} type {1}", queueName, type ));
             } else {
                 inMemoryCounters.put( key, new InMemoryCount( value ));
             }
         }
 
-        return inMemoryCounters.get( key ).value();
+        long value = inMemoryCounters.get( key ).value();
+        return value;
     }
 
 
@@ -253,15 +274,18 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
 
         if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
 
-            long totalIncrement = inMemoryCount.getIncrement().get();
+            long totalIncrement = inMemoryCount.getIncrement();
             incrementCounterInStorage( queueName, type, totalIncrement );
 
-            long totalDecrement = inMemoryCount.getDecrement().get();
+            long totalDecrement = inMemoryCount.getDecrement();
             decrementCounterInStorage( queueName, type, totalDecrement );
 
-            inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
-            inMemoryCount.getIncrement().set( 0L );
-            inMemoryCount.getDecrement().set( 0L );
+            long baseCount = retrieveCounterFromStorage( queueName, type );
+
+            logger.debug("Writing queue counter {} type {} to storage count = {}", queueName, type, baseCount );
+
+            inMemoryCount.setBaseCount( baseCount );
+            inMemoryCount.clearDeltas();
 
             numChanges.set( 0 );
         }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index 708132c..a174dd0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -179,7 +179,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
             Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
                     Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
             Shard shard = shardStrategy.selectShard(
-                    queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
+                    queueName, region, shardType, queueMessageId );
             shardId = shard.getShardId();
         } else {
             shardId = shardIdOrNull;

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index f303f43..276498d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -94,7 +94,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
     public ShardCounterSerializationImpl(
         CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
         this.cassandraConfig = cassandraConfig;
-        this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+        this.maxInMemoryIncrement = qakkaFig.getShardCounterMaxInMemory();
         this.cassandraClient = cassandraClient;
     }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index c7d53a3..d542096 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -24,7 +24,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n
 log4j.logger.org.apache.cassandra=WARN
 log4j.logger.org.glassfish=WARN
 
-log4j.logger.org.apache.usergrid=INFO
-#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-#log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
+log4j.logger.org.apache.usergrid.persistence.qakka=INFO
+log4j.logger.org.apache.usergrid.persistence.queue=INFO
 log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO

http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 95b2509..464b48d 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -48,8 +48,9 @@ queue.shard.max.size=10
 queue.shard.allocation.check.frequency.millis=100
 queue.shard.allocation.advance.time.millis=200
 
-queue.max.inmemory.shard.counter = 100
-queue.max.inmemory.max.message.changes=3
+# set low for testing purposes
+queue.shard.counter.max-in-memory=10
+queue.message.counter.max-in-memory=10
 
 queue.long.polling.time.millis=2000