You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/08 23:11:22 UTC
svn commit: r1444259 -
/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
Author: kturner
Date: Fri Feb 8 22:11:22 2013
New Revision: 1444259
URL: http://svn.apache.org/r1444259
Log:
ACCUMULO-1049 modified master to stop locking tserver nodes and just monitor tserver nodes in zookeeper
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1444259&r1=1444258&r2=1444259&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Fri Feb 8 22:11:22 2013
@@ -22,13 +22,9 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
-import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -49,6 +45,7 @@ import org.apache.accumulo.server.util.t
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.trace.instrument.Tracer;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
@@ -57,7 +54,6 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.NotEmptyException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.data.Stat;
public class LiveTServerSet implements Watcher {
@@ -189,26 +185,17 @@ public class LiveTServerSet implements W
}
static class TServerInfo {
- ZooLock lock;
TServerConnection connection;
TServerInstance instance;
- TServerLockWatcher watcher;
- TServerInfo(ZooLock lock, TServerInstance instance, TServerConnection connection, TServerLockWatcher watcher) {
- this.lock = lock;
+ TServerInfo(TServerInstance instance, TServerConnection connection) {
this.connection = connection;
this.instance = instance;
- this.watcher = watcher;
- }
-
- void cleanup() throws InterruptedException, KeeperException {
- lock.tryToCancelAsyncLockOrUnlock();
}
};
// Map from tserver master service to server information
private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
- private HashMap<String,Long> serversToDelete = new HashMap<String,Long>();
public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
this.cback = cback;
@@ -230,7 +217,7 @@ public class LiveTServerSet implements W
public void run() {
scanServers();
}
- }, 0, 1000);
+ }, 0, 5000);
}
public synchronized void scanServers() {
@@ -240,21 +227,11 @@ public class LiveTServerSet implements W
final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
- Iterator<Entry<String,Long>> serversToDelIter = serversToDelete.entrySet().iterator();
- while (serversToDelIter.hasNext()) {
- Entry<String,Long> entry = serversToDelIter.next();
- if (System.currentTimeMillis() - entry.getValue() > 10000) {
- String serverNode = path + "/" + entry.getKey();
- serversToDelIter.remove();
- deleteServerNode(serverNode);
- }
- }
-
- for (String server : getZooCache().getChildren(path)) {
- if (serversToDelete.containsKey(server))
- continue;
-
- checkServer(updates, doomed, path, server, 2);
+ HashSet<String> all = new HashSet<String>(current.keySet());
+ all.addAll(getZooCache().getChildren(path));
+
+ for (String server : all) {
+ checkServer(updates, doomed, path, server);
}
// log.debug("Current: " + current.keySet());
@@ -275,94 +252,71 @@ public class LiveTServerSet implements W
}
}
- private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String server,
- int recurse)
- throws TException,
- InterruptedException, KeeperException {
-
- if (recurse == 0)
- return;
+ private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String server)
+ throws TException, InterruptedException, KeeperException {
- // See if we have an async lock in place?
TServerInfo info = current.get(server);
- TServerLockWatcher watcher;
- ZooLock lock;
+
final String lockPath = path + "/" + server;
- if (info != null) {
- // yep: get out the lock/watcher so we can check on it
- watcher = info.watcher;
- lock = info.lock;
- } else {
- // nope: create a new lock and watcher
- lock = new ZooLock(lockPath);
- watcher = new TServerLockWatcher();
- lock.lockAsync(watcher, "master".getBytes());
- }
- TServerInstance instance = null;
- // Did we win the lock yet?
- if (!lock.isLocked() && !watcher.gotLock && watcher.failureException == null) {
- // Nope... there's a server out there: is this is a new server?
- if (info == null) {
- // Yep: hold onto the information about this server
- Stat stat = new Stat();
- byte[] lockData = ZooLock.getLockData(lockPath, stat);
- String lockString = new String(lockData == null ? new byte[] {} : lockData);
- if (lockString.length() > 0 && !lockString.equals("master")) {
- ServerServices services = new ServerServices(new String(lockData));
- InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
- InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
- TServerConnection conn = new TServerConnection(addr);
- instance = new TServerInstance(client, stat.getEphemeralOwner());
- info = new TServerInfo(lock, instance, conn, watcher);
- current.put(server, info);
- updates.add(instance);
- } else {
- lock.tryToCancelAsyncLockOrUnlock();
- }
- }
- } else {
- // Yes... there is no server here any more
- lock.tryToCancelAsyncLockOrUnlock();
+ Stat stat = new Stat();
+ byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
+
+ if (lockData == null) {
if (info != null) {
- // a server existed here and went away so delete its node
doomed.add(info.instance);
current.remove(server);
- info.cleanup();
- deleteServerNode(lockPath);
- } else {
- // never knew of this server before... it could be a new server that has not created its lock node yet... watch and see if it creates the node or
- // delete it later if it does not
- List<String> children = ZooReaderWriter.getInstance().getChildren(lockPath, new Watcher() {
- @Override
- public void process(WatchedEvent arg0) {
- if (arg0.getType() == EventType.NodeChildrenChanged) {
- Set<TServerInstance> updates = new HashSet<TServerInstance>();
- Set<TServerInstance> doomed = new HashSet<TServerInstance>();
- try {
- checkServer(updates, doomed, path, server, 2);
- } catch (Exception ex) {
- log.error(ex, ex);
- }
-
- if (!doomed.isEmpty() || !updates.isEmpty())
- cback.update(LiveTServerSet.this, doomed, updates);
- }
- }
- });
-
- if (children.size() > 0) {
- checkServer(updates, doomed, path, server, recurse--);
- } else {
- if (!serversToDelete.containsKey(server))
- serversToDelete.put(server, System.currentTimeMillis());
- }
+ }
+
+ deleteServerNode(path + "/" + server);
+ } else {
+ ServerServices services = new ServerServices(new String(lockData));
+ InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+ InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
+ TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
+
+ if (info == null) {
+ updates.add(instance);
+ current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+ } else if (!info.instance.equals(instance)) {
+ doomed.add(info.instance);
+ updates.add(instance);
+ current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
}
}
}
-
+
@Override
public void process(WatchedEvent event) {
- scanServers();
+
+ // its imporant that these event are propogated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
+ // relevant nodes before code below reads from zoocache
+
+ if (event.getPath() != null) {
+ if (event.getPath().endsWith(Constants.ZTSERVERS)) {
+ scanServers();
+ } else if (event.getPath().contains(Constants.ZTSERVERS)) {
+ int pos = event.getPath().lastIndexOf('/');
+
+ // do only if ZTSERVER is parent
+ if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
+
+ String server = event.getPath().substring(pos + 1);
+
+ final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+ final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+
+ final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+
+ try {
+ checkServer(updates, doomed, path, server);
+ if (!doomed.isEmpty() || !updates.isEmpty())
+ this.cback.update(this, doomed, updates);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+ }
+ }
}
public synchronized TServerConnection getConnection(TServerInstance server) throws TException {
@@ -404,14 +358,8 @@ public class LiveTServerSet implements W
}
public synchronized void remove(TServerInstance server) {
- TServerInfo remove = current.remove(server.hostPort());
- if (remove != null) {
- try {
- remove.cleanup();
- } catch (Exception e) {
- log.info("error cleaning up connection to server", e);
- }
- }
+ current.remove(server.hostPort());
+
log.info("Removing zookeeper lock for " + server);
String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + server.hostPort();
try {