You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@usergrid.apache.org by mr...@apache.org on 2016/09/17 18:51:08 UTC
usergrid git commit: Update bootstrap/init logic so avoid race
conditions in writing a new queue to the database.
Repository: usergrid
Updated Branches:
refs/heads/usergrid-1318-queue 03eb31246 -> a0b0f717b
Update bootstrap/init logic so avoid race conditions in writing a new queue to the database.
Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/a0b0f717
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/a0b0f717
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/a0b0f717
Branch: refs/heads/usergrid-1318-queue
Commit: a0b0f717bb1a23e4879282726045eba12514850d
Parents: 03eb312
Author: Michael Russo <mr...@apigee.com>
Authored: Sat Sep 17 11:50:34 2016 -0700
Committer: Michael Russo <mr...@apigee.com>
Committed: Sat Sep 17 11:50:34 2016 -0700
----------------------------------------------------------------------
.../usergrid/persistence/index/IndexAlias.java | 4 ++--
.../qakka/core/impl/QueueManagerImpl.java | 7 ++++---
.../qakka/distributed/actors/QueueSender.java | 2 +-
.../impl/DistributedQueueServiceImpl.java | 21 ++++++++++++++++++--
4 files changed, 26 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
index 9952aa8..e296895 100644
--- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
+++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexAlias.java
@@ -17,13 +17,13 @@
package org.apache.usergrid.persistence.index;
-import org.apache.usergrid.persistence.index.IndexFig;
+import java.io.Serializable;
/**
* Abstraction for Index alias names
*/
-public interface IndexAlias{
+public interface IndexAlias extends Serializable{
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
index bbb46a8..789edd4 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/core/impl/QueueManagerImpl.java
@@ -57,9 +57,7 @@ public class QueueManagerImpl implements QueueManager {
}
@Override
- public void createQueue(Queue queue) {
-
- queueSerialization.writeQueue(queue.toDatabaseQueue());
+ public void createQueue(Queue queue) {
List<String> regions = new ArrayList<>();
@@ -86,6 +84,9 @@ public class QueueManagerImpl implements QueueManager {
shardSerialization.createShard( inflight );
}
+ // only write the existence of a queue to the database if its dependent initial shards have been written
+ queueSerialization.writeQueue(queue.toDatabaseQueue());
+
distributedQueueService.initQueue( queue.getName() );
distributedQueueService.refreshQueue( queue.getName() );
}
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
index 8bd733b..03d1216 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/actors/QueueSender.java
@@ -167,7 +167,7 @@ public class QueueSender extends UntypedActor {
}
}
- throw new QakkaRuntimeException( "Error adding to queue after " + retries );
+ throw new QakkaRuntimeException( "Error adding to queue after " + retries + " retries" );
} finally {
timer.stop();
http://git-wip-us.apache.org/repos/asf/usergrid/blob/a0b0f717/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
index ec667e6..4737347 100644
--- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
+++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/distributed/impl/DistributedQueueServiceImpl.java
@@ -22,6 +22,7 @@ package org.apache.usergrid.persistence.qakka.distributed.impl;
import akka.actor.ActorRef;
import akka.pattern.Patterns;
import akka.util.Timeout;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import org.apache.usergrid.persistence.actorsystem.ActorSystemManager;
@@ -67,14 +68,29 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public void init() {
- for ( String queueName : queueManager.getListOfQueues() ) {
- initQueue( queueName );
+
+ try {
+ List<String> queues = queueManager.getListOfQueues();
+ for ( String queueName : queues ) {
+ initQueue( queueName );
+ }
+ }catch (InvalidQueryException e){
+
+ if (e.getMessage().contains("unconfigured columnfamily")){
+ logger.info("Unable to initialize queues since system is bootstrapping. " +
+ "Queues will be initialized when created");
+ }else{
+ throw e;
+ }
+
}
+
}
@Override
public void initQueue(String queueName) {
+ logger.info("Initializing queue: {}", queueName);
QueueInitRequest request = new QueueInitRequest( queueName );
ActorRef clientActor = actorSystemManager.getClientActor();
clientActor.tell( request, null );
@@ -91,6 +107,7 @@ public class DistributedQueueServiceImpl implements DistributedQueueService {
@Override
public void refreshQueue(String queueName) {
+ logger.info("Refreshing queue: {}", queueName);
QueueRefreshRequest request = new QueueRefreshRequest( queueName );
ActorRef clientActor = actorSystemManager.getClientActor();
clientActor.tell( request, null );