You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/31 15:46:15 UTC
[20/37] usergrid git commit: Minor refactoring,
renaming and debug logging changes.
Minor refactoring, renaming and debug logging changes.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/f8c3a2dd
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/f8c3a2dd
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/f8c3a2dd
Branch: refs/heads/usergrid-1318-queue
Commit: f8c3a2dd87495cd4f65a455f46a187d9c7badd27
Parents: f56e1b0
Author: Dave Johnson <sn...@apache.org>
Authored: Thu Oct 13 10:12:33 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Thu Oct 13 10:12:33 2016 -0400
----------------------------------------------------------------------
.../asyncevents/AsyncEventsSchedulerFig.java | 9 +--
.../usergrid/persistence/qakka/QakkaFig.java | 12 ++--
.../distributed/actors/ShardAllocator.java | 5 ++
.../impl/DistributedQueueServiceImpl.java | 6 +-
.../impl/MessageCounterSerializationImpl.java | 72 +++++++++++++-------
.../impl/QueueMessageSerializationImpl.java | 2 +-
.../impl/ShardCounterSerializationImpl.java | 2 +-
.../queue/src/test/resources/log4j.properties | 5 +-
.../queue/src/test/resources/qakka.properties | 5 +-
9 files changed, 70 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
index f696568..e556870 100644
--- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
+++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AsyncEventsSchedulerFig.java
@@ -24,13 +24,9 @@ import org.safehaus.guicyfig.GuicyFig;
import org.safehaus.guicyfig.Key;
-/**
- *
- */
@FigSingleton
public interface AsyncEventsSchedulerFig extends GuicyFig {
-
/**
* Amount of threads to use in async processing
*/
@@ -42,25 +38,22 @@ public interface AsyncEventsSchedulerFig extends GuicyFig {
*/
String IO_SCHEDULER_NAME = "scheduler.io.poolName";
-
/**
* Amount of threads to use in async processing
*/
String REPAIR_SCHEDULER_THREADS = "repair.io.threads";
-
/**
* Name of pool to use when performing scheduling
*/
String REPAIR_SCHEDULER_NAME = "repair.io.poolName";
-
@Default( "40" )
@Key( IO_SCHEDULER_THREADS )
int getMaxIoThreads();
- @Default( "Usergrid-SQS-Pool" )
+ @Default( "Usergrid-Queue-Worker-Pool" )
@Key( IO_SCHEDULER_NAME )
String getIoSchedulerName();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
index 3093c39..e6a8667 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/QakkaFig.java
@@ -52,9 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable {
String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds";
- String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.max.shard.counter";
+ String QUEUE_SHARD_COUNTER_MAX_IN_MEMORY = "queue.shard.counter.max-in-memory";
- String QUEUE_MAX_MESSAGE_CHANGES = "queue.max.inmemory.max.message.changes";
+ String QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY = "queue.message.counter.max-in-memory";
String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis";
@@ -123,14 +123,14 @@ public interface QakkaFig extends GuicyFig, Serializable {
int getSendTimeoutSeconds();
/** Once counter reaches this value, write it to permanent storage */
- @Key(QUEUE_MAX_SHARD_COUNTER)
+ @Key(QUEUE_SHARD_COUNTER_MAX_IN_MEMORY)
@Default("100")
- long getMaxInMemoryShardCounter();
+ long getShardCounterMaxInMemory();
/** Once counter reaches this value, write it to permanent storage */
- @Key(QUEUE_MAX_MESSAGE_CHANGES)
+ @Key(QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY)
@Default("100")
- long getMaxInMemoryMessageCounter();
+ long getMessageCounterMaxInMemory();
/** How often to check whether new shard is needed for each queue */
@Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
index 1863472..19059e6 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/ShardAllocator.java
@@ -125,6 +125,7 @@ public class ShardAllocator extends UntypedActor {
long counterValue = 0;
try {
counterValue = shardCounterSerialization.getCounterValue( queueName, type, shard.getShardId() );
+
} catch ( NotFoundException ignored ) {}
if (counterValue > (0.9 * qakkaFig.getMaxShardSize())) {
@@ -140,6 +141,10 @@ public class ShardAllocator extends UntypedActor {
logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );
+
+ } else {
+// logger.debug("No new shard for queue {} counterValue {} of max {}",
+// queueName, counterValue, qakkaFig.getMaxShardSize() );
}
} catch ( Throwable t ) {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index 20bf608..e2c5c2c 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -241,9 +241,9 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
}
}
- if ( ret.isEmpty() ) {
- logger.info( "Requested {} but queue '{}' is empty", count, queueName);
- }
+// if ( ret.isEmpty() ) {
+// logger.info( "Requested {} but queue '{}' is empty", count, queueName);
+// }
return ret;
} finally {
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
index ee4bab2..36175c5 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java
@@ -78,19 +78,35 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
class InMemoryCount {
long baseCount;
+ final AtomicLong totalInMemoryCount = new AtomicLong( 0L ); // for testing using only in-memory counter
final AtomicLong increment = new AtomicLong( 0L );
final AtomicLong decrement = new AtomicLong( 0L );
InMemoryCount( long baseCount ) {
this.baseCount = baseCount;
}
- public AtomicLong getIncrement() {
- return increment;
+ public void increment( long inc ) {
+ increment.addAndGet( inc );
+ totalInMemoryCount.addAndGet( inc );
}
- public AtomicLong getDecrement() {
- return decrement;
+ public void decrement( long dec ) {
+ decrement.addAndGet( dec );
+ totalInMemoryCount.addAndGet( -dec );
+ }
+ public long getIncrement() {
+ return increment.get();
+ }
+ public long getDecrement() {
+ return decrement.get();
+ }
+ public void clearDeltas() {
+ increment.set( 0L );
+ decrement.set( 0L );
}
public long value() {
+
+ // return totalInMemoryCount.get(); // for testing using just in-memory counter:
+
return baseCount + increment.get() - decrement.get();
}
void setBaseCount( long baseCount ) {
@@ -106,7 +122,7 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
this.cassandraConfig = cassandraConfig;
- this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter();
+ this.maxChangesBeforeSave = qakkaFig.getMessageCounterMaxInMemory();
this.cassandraClient = cassandraClient;
}
@@ -139,13 +155,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
InMemoryCount inMemoryCount = inMemoryCounters.get( key );
synchronized ( inMemoryCount ) {
- inMemoryCount.getIncrement().addAndGet( increment );
-
- //logger.info("Incremented Count for queue {} type {} = {}",
- //queueName, type, getCounterValue( queueName, type ));
-
+ inMemoryCount.increment( increment );
saveIfNeeded( queueName, type );
}
+
+ if ( logger.isDebugEnabled() ) {
+ long value = inMemoryCounters.get( key ).value();
+ if (value <= 0) {
+ logger.debug( "Queue {} type {} decremented count = {}", queueName, type, value );
+ }
+ }
}
@@ -172,14 +191,16 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
InMemoryCount inMemoryCount = inMemoryCounters.get( key );
synchronized ( inMemoryCount ) {
-
- inMemoryCount.getDecrement().addAndGet( decrement );
-
- //logger.info("Decremented Count for queue {} type {} = {}",
- //queueName, type, getCounterValue( queueName, type ));
-
+ inMemoryCount.decrement( decrement );
saveIfNeeded( queueName, type );
}
+
+ if ( logger.isDebugEnabled() ) {
+ long value = inMemoryCounters.get( key ).value();
+ if (value <= 0) {
+ logger.debug( "Queue {} type {} incremented count = {}", queueName, type, value );
+ }
+ }
}
@@ -194,14 +215,14 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
if ( value == null ) {
throw new NotFoundException(
- MessageFormat.format( "No counter found for queue {0} type {1}",
- queueName, type ));
+ MessageFormat.format( "No counter found for queue {0} type {1}", queueName, type ));
} else {
inMemoryCounters.put( key, new InMemoryCount( value ));
}
}
- return inMemoryCounters.get( key ).value();
+ long value = inMemoryCounters.get( key ).value();
+ return value;
}
@@ -253,15 +274,18 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat
if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {
- long totalIncrement = inMemoryCount.getIncrement().get();
+ long totalIncrement = inMemoryCount.getIncrement();
incrementCounterInStorage( queueName, type, totalIncrement );
- long totalDecrement = inMemoryCount.getDecrement().get();
+ long totalDecrement = inMemoryCount.getDecrement();
decrementCounterInStorage( queueName, type, totalDecrement );
- inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
- inMemoryCount.getIncrement().set( 0L );
- inMemoryCount.getDecrement().set( 0L );
+ long baseCount = retrieveCounterFromStorage( queueName, type );
+
+ logger.debug("Writing queue counter {} type {} to storage count = {}", queueName, type, baseCount );
+
+ inMemoryCount.setBaseCount( baseCount );
+ inMemoryCount.clearDeltas();
numChanges.set( 0 );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
index 708132c..a174dd0 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/QueueMessageSerializationImpl.java
@@ -179,7 +179,7 @@ public class QueueMessageSerializationImpl implements QueueMessageSerialization
Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
Shard shard = shardStrategy.selectShard(
- queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
+ queueName, region, shardType, queueMessageId );
shardId = shard.getShardId();
} else {
shardId = shardIdOrNull;
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
index f303f43..276498d 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/sharding/impl/ShardCounterSerializationImpl.java
@@ -94,7 +94,7 @@ public class ShardCounterSerializationImpl implements ShardCounterSerialization
public ShardCounterSerializationImpl(
CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
this.cassandraConfig = cassandraConfig;
- this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
+ this.maxInMemoryIncrement = qakkaFig.getShardCounterMaxInMemory();
this.cassandraClient = cassandraClient;
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/log4j.properties b/stack/corepersistence/queue/src/test/resources/log4j.properties
index c7d53a3..d542096 100644
--- a/stack/corepersistence/queue/src/test/resources/log4j.properties
+++ b/stack/corepersistence/queue/src/test/resources/log4j.properties
@@ -24,7 +24,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n
log4j.logger.org.apache.cassandra=WARN
log4j.logger.org.glassfish=WARN
-log4j.logger.org.apache.usergrid=INFO
-#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
-#log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
+log4j.logger.org.apache.usergrid.persistence.qakka=INFO
+log4j.logger.org.apache.usergrid.persistence.queue=INFO
log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO
http://git-wip-us.apache.org/repos/asf/usergrid/blob/f8c3a2dd/stack/corepersistence/queue/src/test/resources/qakka.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties
index 95b2509..464b48d 100644
--- a/stack/corepersistence/queue/src/test/resources/qakka.properties
+++ b/stack/corepersistence/queue/src/test/resources/qakka.properties
@@ -48,8 +48,9 @@ queue.shard.max.size=10
queue.shard.allocation.check.frequency.millis=100
queue.shard.allocation.advance.time.millis=200
-queue.max.inmemory.shard.counter = 100
-queue.max.inmemory.max.message.changes=3
+# set low for testing purposes
+queue.shard.counter.max-in-memory=10
+queue.message.counter.max-in-memory=10
queue.long.polling.time.millis=2000