You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/05 01:37:26 UTC

[2/6] git commit: MongoDB: fix race in adding shards

MongoDB: fix race in adding shards

- if adding the shard fails, then try again


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/2995857b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/2995857b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/2995857b

Branch: refs/heads/master
Commit: 2995857ba93c244035157fc3f7fbb5d48107657f
Parents: 62b3ff7
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 4 00:08:23 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 4 00:09:07 2014 +0000

----------------------------------------------------------------------
 .../nosql/mongodb/MongoDBReplicaSetImpl.java    |   9 +-
 .../sharding/MongoDBShardClusterImpl.java       | 111 ++++++++++++++++---
 2 files changed, 101 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2995857b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
index 6d09b4c..a106ec1 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
@@ -218,6 +218,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
      * seconds time (in the hope that next time the primary will be available).
      */
     private void addSecondaryWhenPrimaryIsNonNull(final MongoDBServer secondary) {
+        // TODO Don't use executor, use ExecutionManager
         executor.submit(new Runnable() {
             @Override
             public void run() {
@@ -377,12 +378,18 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
         //  - if the set is being stopped forever it's irrelevant
         //  - if the set might be restarted I think it just inconveniences us
         // Terminate the executor immediately.
-        // Note that after this the executor will not run if the set is restarted.
+        // TODO Note that after this the executor will not run if the set is restarted.
         executor.shutdownNow();
         super.stop();
         setAttribute(Startable.SERVICE_UP, false);
     }
 
+    @Override
+    public void onManagementStopped() {
+        super.onManagementStopped();
+        executor.shutdownNow();
+    }
+    
     public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
         @Override protected void onEntityChange(Entity member) {
             // Ignored

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2995857b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
index 5b465ca..47c32ae 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
@@ -21,6 +21,10 @@ package brooklyn.entity.nosql.mongodb.sharding;
 import java.net.UnknownHostException;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -37,15 +41,30 @@ import brooklyn.event.SensorEventListener;
 import brooklyn.location.Location;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
 
+import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
 
 public class MongoDBShardClusterImpl extends DynamicClusterImpl implements MongoDBShardCluster {
 
     private static final Logger LOG = LoggerFactory.getLogger(MongoDBShardClusterImpl.class);
+    
     // TODO: Need to use attributes for this in order to support brooklyn restart 
     private Set<Entity> addedMembers = Sets.newConcurrentHashSet();
 
+    // TODO: Need to use attributes for this in order to support brooklyn restart 
+    private Set<Entity> addingMembers = Sets.newConcurrentHashSet();
+
+    /**
+     * For shard addition and removal.
+     * Used for retrying.
+     * 
+     * TODO Should use ExecutionManager.
+     */
+    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
     @Override
     protected EntitySpec<?> getMemberSpec() {
         EntitySpec<?> result = super.getMemberSpec();
@@ -72,33 +91,89 @@ public class MongoDBShardClusterImpl extends DynamicClusterImpl implements Mongo
                     addShards();
             }
         });
-        
+    }
+
+    @Override
+    public void stop() {
+        // TODO Note that after this the executor will not run if the set is restarted.
+        executor.shutdownNow();
+        super.stop();
+    }
+    
+    @Override
+    public void onManagementStopped() {
+        super.onManagementStopped();
+        executor.shutdownNow();
     }
 
     protected void addShards() {
         MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
-        if (router == null)
+        if (router == null) {
+            if (LOG.isTraceEnabled()) LOG.trace("Not adding shards because no running router in {}", this);
             return;
-        
-        MongoDBClientSupport client;
-        try {
-            client = MongoDBClientSupport.forServer(router);
-        } catch (UnknownHostException e) {
-            throw Exceptions.propagate(e);
         }
+        
         for (Entity member : this.getMembers()) {
-            if (member.getAttribute(Startable.SERVICE_UP) && !addedMembers.contains(member)) {
-                MongoDBServer primary = member.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY);
-                if (primary != null) {
-                    String addr = Strings.removeFromStart(primary.getAttribute(MongoDBServer.MONGO_SERVER_ENDPOINT), "http://");
-                    String replicaSetURL = ((MongoDBReplicaSet) member).getName() + "/" + addr;
-                    LOG.info("Using {} to add shard URL {}...", router, replicaSetURL);
-                    client.addShardToRouter(replicaSetURL);
-                    addedMembers.add(member);
+            if (member.getAttribute(Startable.SERVICE_UP) && !addingMembers.contains(member)) {
+                LOG.info("{} adding shard {}", new Object[] {MongoDBShardClusterImpl.this, member});
+                addingMembers.add(member);
+                addShardAsync(member);
+            }
+        }
+    }
+    
+    protected void addShardAsync(final Entity replicaSet) {
+        final Duration timeout = Duration.minutes(20);
+        final Stopwatch stopwatch = Stopwatch.createStarted();
+        final AtomicInteger attempts = new AtomicInteger();
+        
+        // TODO Don't use executor, use ExecutionManager; but following pattern in MongoDBReplicaSetImpl for now.
+        executor.submit(new Runnable() {
+            @Override
+            public void run() {
+                boolean reschedule;
+                MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
+                if (router == null) {
+                    LOG.debug("Rescheduling adding shard {} because no running router for cluster {}", replicaSet, this);
+                    reschedule = true;
                 } else {
-                    LOG.debug("{} not set for member {}); not adding shart to router {}", new Object[] {MongoDBReplicaSet.PRIMARY_ENTITY, member, router});
+                    MongoDBClientSupport client;
+                    try {
+                        client = MongoDBClientSupport.forServer(router);
+                    } catch (UnknownHostException e) {
+                        throw Exceptions.propagate(e);
+                    }
+                    
+                    MongoDBServer primary = replicaSet.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY);
+                    if (primary != null) {
+                        String addr = Strings.removeFromStart(primary.getAttribute(MongoDBServer.MONGO_SERVER_ENDPOINT), "http://");
+                        String replicaSetURL = ((MongoDBReplicaSet) replicaSet).getName() + "/" + addr;
+                        boolean added = client.addShardToRouter(replicaSetURL);
+                        if (added) {
+                            LOG.info("{} added shard {} via {}", new Object[] {MongoDBShardClusterImpl.this, replicaSetURL, router});
+                            addedMembers.add(replicaSet);
+                            reschedule = false;
+                        } else {
+                            LOG.debug("Rescheduling addition of shard {} because add failed via router {}", replicaSetURL, router);
+                            reschedule = true;
+                        }
+                    } else {
+                        LOG.debug("Rescheduling addition of shard {} because primary is null", replicaSet);
+                        reschedule = true;
+                    }
+                }
+                
+                if (reschedule) {
+                    int numAttempts = attempts.incrementAndGet();
+                    if (numAttempts > 1 && timeout.toMilliseconds() > stopwatch.elapsed(TimeUnit.MILLISECONDS)) {
+                        executor.schedule(this, 3, TimeUnit.SECONDS);
+                    } else {
+                        LOG.warn("Timeout after {} attempts ({}) adding shard {}; aborting", 
+                                new Object[] {numAttempts, Time.makeTimeStringRounded(stopwatch), replicaSet});
+                        addingMembers.remove(replicaSet);
+                    }
                 }
             }
-        }
+        });
     }
 }