You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/09/17 18:51:08 UTC

usergrid git commit: Update bootstrap/init logic so avoid race conditions in writing a new queue to the database.

Repository: usergrid
Updated Branches:
  refs/heads/usergrid-1318-queue 03eb31246 -> a0b0f717b


Update bootstrap/init logic so avoid race conditions in writing a new queue to the database.


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a0b0f717
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a0b0f717
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a0b0f717

Branch: refs/heads/usergrid-1318-queue
Commit: a0b0f717bb1a23e4879282726045eba12514850d
Parents: 03eb312
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Sep 17 11:50:34 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Sep 17 11:50:34 2016 -0700

----------------------------------------------------------------------
 .../usergrid/persistence/index/IndexAlias.java  |  4 ++--
 .../qakka/core/impl/QueueManagerImpl.java       |  7 ++++---
 .../qakka/distributed/actors/QueueSender.java   |  2 +-
 .../impl/DistributedQueueServiceImpl.java       | 21 ++++++++++++++++++--
 4 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
index 9952aa8..e296895 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
@@ -17,13 +17,13 @@
 package org.apache.usergrid.persistence.index;
 
 
-import org.apache.usergrid.persistence.index.IndexFig;
+import java.io.Serializable;
 
 
 /**
  * Abstraction for Index alias names
  */
-public interface IndexAlias{
+public interface IndexAlias extends Serializable{
 
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index bbb46a8..789edd4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -57,9 +57,7 @@ public class QueueManagerImpl implements QueueManager {
     }
 
     @Override
-    public void createQueue(Queue queue) {
-
-        queueSerialization.writeQueue(queue.toDatabaseQueue());
+    public void  createQueue(Queue queue) {
 
         List<String> regions = new ArrayList<>();
 
@@ -86,6 +84,9 @@ public class QueueManagerImpl implements QueueManager {
             shardSerialization.createShard( inflight );
         }
 
+        // only write the existence of a queue to the database if its dependent initial shards have been written
+        queueSerialization.writeQueue(queue.toDatabaseQueue());
+
         distributedQueueService.initQueue( queue.getName() );
         distributedQueueService.refreshQueue( queue.getName() );
     }

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 8bd733b..03d1216 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -167,7 +167,7 @@ public class QueueSender extends UntypedActor {
                 }
             }
 
-            throw new QakkaRuntimeException( "Error adding to queue after " + retries );
+            throw new QakkaRuntimeException( "Error adding to queue after " + retries + " retries" );
 
         } finally {
             timer.stop();

http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index ec667e6..4737347 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.qakka.distributed.impl;
 import akka.actor.ActorRef;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
@@ -67,14 +68,29 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
     @Override
     public void init() {
-        for ( String queueName : queueManager.getListOfQueues() ) {
-            initQueue( queueName );
+
+        try {
+            List<String> queues = queueManager.getListOfQueues();
+            for ( String queueName : queues ) {
+                initQueue( queueName );
+            }
+        }catch (InvalidQueryException e){
+
+            if (e.getMessage().contains("unconfigured columnfamily")){
+                logger.info("Unable to initialize queues since system is bootstrapping.  " +
+                    "Queues will be initialized when created");
+            }else{
+                throw e;
+            }
+
         }
+
     }
 
 
     @Override
     public void initQueue(String queueName) {
+        logger.info("Initializing queue: {}", queueName);
         QueueInitRequest request = new QueueInitRequest( queueName );
         ActorRef clientActor = actorSystemManager.getClientActor();
         clientActor.tell( request, null );
@@ -91,6 +107,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
 
     @Override
     public void refreshQueue(String queueName) {
+        logger.info("Refreshing queue: {}", queueName);
         QueueRefreshRequest request = new QueueRefreshRequest( queueName );
         ActorRef clientActor = actorSystemManager.getClientActor();
         clientActor.tell( request, null );