You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@brooklyn.apache.org by al...@apache.org on 2014/11/05 01:37:26 UTC
[2/6] git commit: MongoDB: fix race in adding shards
MongoDB: fix race in adding shards
- if adding the shard fails, then try again
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/2995857b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/2995857b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/2995857b
Branch: refs/heads/master
Commit: 2995857ba93c244035157fc3f7fbb5d48107657f
Parents: 62b3ff7
Author: Aled Sage <al...@gmail.com>
Authored: Tue Nov 4 00:08:23 2014 +0000
Committer: Aled Sage <al...@gmail.com>
Committed: Tue Nov 4 00:09:07 2014 +0000
----------------------------------------------------------------------
.../nosql/mongodb/MongoDBReplicaSetImpl.java | 9 +-
.../sharding/MongoDBShardClusterImpl.java | 111 ++++++++++++++++---
2 files changed, 101 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2995857b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
index 6d09b4c..a106ec1 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/MongoDBReplicaSetImpl.java
@@ -218,6 +218,7 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
* seconds time (in the hope that next time the primary will be available).
*/
private void addSecondaryWhenPrimaryIsNonNull(final MongoDBServer secondary) {
+ // TODO Don't use executor, use ExecutionManager
executor.submit(new Runnable() {
@Override
public void run() {
@@ -377,12 +378,18 @@ public class MongoDBReplicaSetImpl extends DynamicClusterImpl implements MongoDB
// - if the set is being stopped forever it's irrelevant
// - if the set might be restarted I think it just inconveniences us
// Terminate the executor immediately.
- // Note that after this the executor will not run if the set is restarted.
+ // TODO Note that after this the executor will not run if the set is restarted.
executor.shutdownNow();
super.stop();
setAttribute(Startable.SERVICE_UP, false);
}
+ @Override
+ public void onManagementStopped() {
+ super.onManagementStopped();
+ executor.shutdownNow();
+ }
+
public static class MemberTrackingPolicy extends AbstractMembershipTrackingPolicy {
@Override protected void onEntityChange(Entity member) {
// Ignored
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2995857b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
----------------------------------------------------------------------
diff --git a/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
index 5b465ca..47c32ae 100644
--- a/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
+++ b/software/nosql/src/main/java/brooklyn/entity/nosql/mongodb/sharding/MongoDBShardClusterImpl.java
@@ -21,6 +21,10 @@ package brooklyn.entity.nosql.mongodb.sharding;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -37,15 +41,30 @@ import brooklyn.event.SensorEventListener;
import brooklyn.location.Location;
import brooklyn.util.exceptions.Exceptions;
import brooklyn.util.text.Strings;
+import brooklyn.util.time.Duration;
+import brooklyn.util.time.Time;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
public class MongoDBShardClusterImpl extends DynamicClusterImpl implements MongoDBShardCluster {
private static final Logger LOG = LoggerFactory.getLogger(MongoDBShardClusterImpl.class);
+
// TODO: Need to use attributes for this in order to support brooklyn restart
private Set<Entity> addedMembers = Sets.newConcurrentHashSet();
+ // TODO: Need to use attributes for this in order to support brooklyn restart
+ private Set<Entity> addingMembers = Sets.newConcurrentHashSet();
+
+ /**
+ * For shard addition and removal.
+ * Used for retrying.
+ *
+ * TODO Should use ExecutionManager.
+ */
+ private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
+
@Override
protected EntitySpec<?> getMemberSpec() {
EntitySpec<?> result = super.getMemberSpec();
@@ -72,33 +91,89 @@ public class MongoDBShardClusterImpl extends DynamicClusterImpl implements Mongo
addShards();
}
});
-
+ }
+
+ @Override
+ public void stop() {
+ // TODO Note that after this the executor will not run if the set is restarted.
+ executor.shutdownNow();
+ super.stop();
+ }
+
+ @Override
+ public void onManagementStopped() {
+ super.onManagementStopped();
+ executor.shutdownNow();
}
protected void addShards() {
MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
- if (router == null)
+ if (router == null) {
+ if (LOG.isTraceEnabled()) LOG.trace("Not adding shards because no running router in {}", this);
return;
-
- MongoDBClientSupport client;
- try {
- client = MongoDBClientSupport.forServer(router);
- } catch (UnknownHostException e) {
- throw Exceptions.propagate(e);
}
+
for (Entity member : this.getMembers()) {
- if (member.getAttribute(Startable.SERVICE_UP) && !addedMembers.contains(member)) {
- MongoDBServer primary = member.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY);
- if (primary != null) {
- String addr = Strings.removeFromStart(primary.getAttribute(MongoDBServer.MONGO_SERVER_ENDPOINT), "http://");
- String replicaSetURL = ((MongoDBReplicaSet) member).getName() + "/" + addr;
- LOG.info("Using {} to add shard URL {}...", router, replicaSetURL);
- client.addShardToRouter(replicaSetURL);
- addedMembers.add(member);
+ if (member.getAttribute(Startable.SERVICE_UP) && !addingMembers.contains(member)) {
+ LOG.info("{} adding shard {}", new Object[] {MongoDBShardClusterImpl.this, member});
+ addingMembers.add(member);
+ addShardAsync(member);
+ }
+ }
+ }
+
+ protected void addShardAsync(final Entity replicaSet) {
+ final Duration timeout = Duration.minutes(20);
+ final Stopwatch stopwatch = Stopwatch.createStarted();
+ final AtomicInteger attempts = new AtomicInteger();
+
+ // TODO Don't use executor, use ExecutionManager; but following pattern in MongoDBReplicaSetImpl for now.
+ executor.submit(new Runnable() {
+ @Override
+ public void run() {
+ boolean reschedule;
+ MongoDBRouter router = getParent().getAttribute(MongoDBShardedDeployment.ROUTER_CLUSTER).getAttribute(MongoDBRouterCluster.ANY_RUNNING_ROUTER);
+ if (router == null) {
+ LOG.debug("Rescheduling adding shard {} because no running router for cluster {}", replicaSet, this);
+ reschedule = true;
} else {
- LOG.debug("{} not set for member {}); not adding shart to router {}", new Object[] {MongoDBReplicaSet.PRIMARY_ENTITY, member, router});
+ MongoDBClientSupport client;
+ try {
+ client = MongoDBClientSupport.forServer(router);
+ } catch (UnknownHostException e) {
+ throw Exceptions.propagate(e);
+ }
+
+ MongoDBServer primary = replicaSet.getAttribute(MongoDBReplicaSet.PRIMARY_ENTITY);
+ if (primary != null) {
+ String addr = Strings.removeFromStart(primary.getAttribute(MongoDBServer.MONGO_SERVER_ENDPOINT), "http://");
+ String replicaSetURL = ((MongoDBReplicaSet) replicaSet).getName() + "/" + addr;
+ boolean added = client.addShardToRouter(replicaSetURL);
+ if (added) {
+ LOG.info("{} added shard {} via {}", new Object[] {MongoDBShardClusterImpl.this, replicaSetURL, router});
+ addedMembers.add(replicaSet);
+ reschedule = false;
+ } else {
+ LOG.debug("Rescheduling addition of shard {} because add failed via router {}", replicaSetURL, router);
+ reschedule = true;
+ }
+ } else {
+ LOG.debug("Rescheduling addition of shard {} because primary is null", replicaSet);
+ reschedule = true;
+ }
+ }
+
+ if (reschedule) {
+ int numAttempts = attempts.incrementAndGet();
+ if (numAttempts > 1 && timeout.toMilliseconds() > stopwatch.elapsed(TimeUnit.MILLISECONDS)) {
+ executor.schedule(this, 3, TimeUnit.SECONDS);
+ } else {
+ LOG.warn("Timeout after {} attempts ({}) adding shard {}; aborting",
+ new Object[] {numAttempts, Time.makeTimeStringRounded(stopwatch), replicaSet});
+ addingMembers.remove(replicaSet);
+ }
}
}
- }
+ });
}
}