You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/05 14:04:44 UTC
[3/5] usergrid git commit: Logging improvements
Logging improvements
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1f28e2ab
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1f28e2ab
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1f28e2ab
Branch: refs/heads/usergrid-1318-queue
Commit: 1f28e2abc46fe325bd861729f83541809cfc9ff1
Parents: 81233b0
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 09:59:20 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 09:59:20 2016 -0400
----------------------------------------------------------------------
.../impl/DistributedQueueServiceImpl.java | 47 ++++++++++++++------
1 file changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/1f28e2ab/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index e24bdb4..af71247 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Future;
+import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -66,6 +67,9 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
this.queueManager = queueManager;
this.qakkaFig = qakkaFig;
this.messageCounterSerialization = messageCounterSerialization;
+
+
+
}
@@ -74,20 +78,34 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
try {
List<String> queues = queueManager.getListOfQueues();
- for ( String queueName : queues ) {
+ for (String queueName : queues) {
initQueue( queueName );
}
- }catch (InvalidQueryException e){
+ } catch (InvalidQueryException e) {
- if (e.getMessage().contains("unconfigured columnfamily")){
- logger.info("Unable to initialize queues since system is bootstrapping. " +
- "Queues will be initialized when created");
- }else{
+ if (e.getMessage().contains( "unconfigured columnfamily" )) {
+ logger.info( "Unable to initialize queues since system is bootstrapping. " +
+ "Queues will be initialized when created" );
+ } else {
throw e;
}
}
+ StringBuilder logMessage = new StringBuilder();
+ logMessage.append( "DistributedQueueServiceImpl initialized with config:\n" );
+ Method[] methods = qakkaFig.getClass().getMethods();
+ for ( Method method : methods ) {
+ if ( method.getName().startsWith("get")) {
+ try {
+ logMessage.append(" ")
+ .append( method.getName().substring(3) )
+ .append(" = ")
+ .append( method.invoke( qakkaFig ).toString() ).append("\n");
+ } catch (Exception ignored ) {}
+ }
+ }
+ logger.info( logMessage.toString() );
}
@@ -203,7 +221,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
ret.addAll( getNextMessagesInternal( queueName, count ));
if ( ret.size() < count ) {
- try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 5 ); } catch (Exception ignored) {}
+ try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {}
}
}
@@ -226,7 +244,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
int retries = 0;
QueueGetRequest request = new QueueGetRequest( queueName, count );
- while ( retries++ < maxRetries ) {
+ while ( ++retries < maxRetries ) {
try {
Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
@@ -243,17 +261,18 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
QueueGetResponse qprm = (QueueGetResponse)response;
if ( qprm.isSuccess() ) {
if (retries > 1) {
- logger.debug( "getNextMessage SUCCESS after {} retries", retries );
+ logger.debug( "getNextMessage {} SUCCESS after {} retries", queueName, retries );
}
}
+ logger.debug("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size());
return qprm.getQueueMessages();
} else if ( response != null ) {
- logger.debug("ERROR RESPONSE (1) popping queue, retrying {}", retries );
+ logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, retries );
} else {
- logger.debug("TIMEOUT popping to queue, retrying {}", retries );
+ logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, retries );
}
} else if ( responseObject instanceof ClientActor.ErrorResponse ) {
@@ -263,16 +282,16 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
errorResponse.getMessage(), retries );
} else {
- logger.debug("UNKNOWN RESPONSE popping queue, retrying {}", retries );
+ logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, retries );
}
} catch ( Exception e ) {
- logger.debug("ERROR popping to queue, retrying " + retries, e );
+ logger.debug("ERROR popping to queue " + queueName + " retrying " + retries, e );
}
}
throw new QakkaRuntimeException(
- "Error getting from queue " + queueName + " after " + retries );
+ "Error getting from queue " + queueName + " after " + retries + " tries");
}