You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by sn...@apache.org on 2016/10/05 14:04:44 UTC

[3/5] usergrid git commit: Logging improvements

Logging improvements


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/1f28e2ab
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/1f28e2ab
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/1f28e2ab

Branch: refs/heads/usergrid-1318-queue
Commit: 1f28e2abc46fe325bd861729f83541809cfc9ff1
Parents: 81233b0
Author: Dave Johnson <sn...@apache.org>
Authored: Wed Oct 5 09:59:20 2016 -0400
Committer: Dave Johnson <sn...@apache.org>
Committed: Wed Oct 5 09:59:20 2016 -0400

----------------------------------------------------------------------
 .../impl/DistributedQueueServiceImpl.java       | 47 ++++++++++++++------
 1 file changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/1f28e2ab/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index e24bdb4..af71247 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 
+import java.lang.reflect.Method;
 import java.util.*;
 import java.util.concurrent.TimeUnit;
 
@@ -66,6 +67,9 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
         this.queueManager = queueManager;
         this.qakkaFig = qakkaFig;
         this.messageCounterSerialization = messageCounterSerialization;
+
+
+
     }
 
 
@@ -74,20 +78,34 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
         try {
             List<String> queues = queueManager.getListOfQueues();
-            for ( String queueName : queues ) {
+            for (String queueName : queues) {
                 initQueue( queueName );
             }
-        }catch (InvalidQueryException e){
+        } catch (InvalidQueryException e) {
 
-            if (e.getMessage().contains("unconfigured columnfamily")){
-                logger.info("Unable to initialize queues since system is bootstrapping.  " +
-                    "Queues will be initialized when created");
-            }else{
+            if (e.getMessage().contains( "unconfigured columnfamily" )) {
+                logger.info( "Unable to initialize queues since system is bootstrapping.  " +
+                    "Queues will be initialized when created" );
+            } else {
                 throw e;
             }
 
         }
 
+        StringBuilder logMessage = new StringBuilder();
+        logMessage.append( "DistributedQueueServiceImpl initialized with config:\n" );
+        Method[] methods = qakkaFig.getClass().getMethods();
+        for ( Method method : methods ) {
+            if ( method.getName().startsWith("get")) {
+                try {
+                    logMessage.append("   ")
+                        .append( method.getName().substring(3) )
+                        .append(" = ")
+                        .append( method.invoke( qakkaFig ).toString() ).append("\n");
+                } catch (Exception ignored ) {}
+            }
+        }
+        logger.info( logMessage.toString() );
     }
 
 
@@ -203,7 +221,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
             ret.addAll( getNextMessagesInternal( queueName, count ));
 
             if ( ret.size() < count ) {
-                try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 5 ); } catch (Exception ignored) {}
+                try { Thread.sleep( qakkaFig.getLongPollTimeMillis() / 2 ); } catch (Exception ignored) {}
             }
         }
 
@@ -226,7 +244,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
         int retries = 0;
 
         QueueGetRequest request = new QueueGetRequest( queueName, count );
-        while ( retries++ < maxRetries ) {
+        while ( ++retries < maxRetries ) {
             try {
                 Timeout t = new Timeout( qakkaFig.getGetTimeoutSeconds(), TimeUnit.SECONDS );
 
@@ -243,17 +261,18 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                         QueueGetResponse qprm = (QueueGetResponse)response;
                         if ( qprm.isSuccess() ) {
                             if (retries > 1) {
-                                logger.debug( "getNextMessage SUCCESS after {} retries", retries );
+                                logger.debug( "getNextMessage {} SUCCESS after {} retries", queueName, retries );
                             }
                         }
+                        logger.debug("Returning queue {} messages {}", queueName, qprm.getQueueMessages().size());
                         return qprm.getQueueMessages();
 
 
                     } else if ( response != null  ) {
-                        logger.debug("ERROR RESPONSE (1) popping queue, retrying {}", retries );
+                        logger.debug("ERROR RESPONSE (1) popping queue {}, retrying {}", queueName, retries );
 
                     } else {
-                        logger.debug("TIMEOUT popping to queue, retrying {}", retries );
+                        logger.debug("TIMEOUT popping from queue {}, retrying {}", queueName, retries );
                     }
 
                 } else if ( responseObject instanceof ClientActor.ErrorResponse ) {
@@ -263,16 +282,16 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
                         errorResponse.getMessage(), retries );
 
                 } else {
-                    logger.debug("UNKNOWN RESPONSE popping queue, retrying {}", retries );
+                    logger.debug("UNKNOWN RESPONSE popping queue {}, retrying {}", queueName, retries );
                 }
 
             } catch ( Exception e ) {
-                logger.debug("ERROR popping to queue, retrying " + retries, e );
+                logger.debug("ERROR popping to queue " + queueName + " retrying " + retries, e );
             }
         }
 
         throw new QakkaRuntimeException(
-                "Error getting from queue " + queueName + " after " + retries );
+                "Error getting from queue " + queueName + " after " + retries + " tries");
     }