You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/08 23:11:22 UTC

svn commit: r1444259 - /accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java

Author: kturner
Date: Fri Feb  8 22:11:22 2013
New Revision: 1444259

URL: http://svn.apache.org/r1444259
Log:
ACCUMULO-1049 modified master to stop locking tserver nodes and just monitor  tserver nodes in zookeeper

Modified:
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1444259&r1=1444258&r2=1444259&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java (original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Fri Feb  8 22:11:22 2013
@@ -22,13 +22,9 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
@@ -49,6 +45,7 @@ import org.apache.accumulo.server.util.t
 import org.apache.accumulo.server.zookeeper.ZooCache;
 import org.apache.accumulo.server.zookeeper.ZooLock;
 import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.trace.instrument.Tracer;
 import org.apache.hadoop.io.Text;
 import org.apache.log4j.Logger;
 import org.apache.thrift.TException;
@@ -57,7 +54,6 @@ import org.apache.zookeeper.KeeperExcept
 import org.apache.zookeeper.KeeperException.NotEmptyException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.data.Stat;
 
 public class LiveTServerSet implements Watcher {
@@ -189,26 +185,17 @@ public class LiveTServerSet implements W
   }
   
   static class TServerInfo {
-    ZooLock lock;
     TServerConnection connection;
     TServerInstance instance;
-    TServerLockWatcher watcher;
     
-    TServerInfo(ZooLock lock, TServerInstance instance, TServerConnection connection, TServerLockWatcher watcher) {
-      this.lock = lock;
+    TServerInfo(TServerInstance instance, TServerConnection connection) {
       this.connection = connection;
       this.instance = instance;
-      this.watcher = watcher;
-    }
-    
-    void cleanup() throws InterruptedException, KeeperException {
-      lock.tryToCancelAsyncLockOrUnlock();
     }
   };
   
   // Map from tserver master service to server information
   private Map<String,TServerInfo> current = new HashMap<String,TServerInfo>();
-  private HashMap<String,Long> serversToDelete = new HashMap<String,Long>();
 
   public LiveTServerSet(Instance instance, AccumuloConfiguration conf, Listener cback) {
     this.cback = cback;
@@ -230,7 +217,7 @@ public class LiveTServerSet implements W
       public void run() {
         scanServers();
       }
-    }, 0, 1000);
+    }, 0, 5000);
   }
   
   public synchronized void scanServers() {
@@ -240,21 +227,11 @@ public class LiveTServerSet implements W
       
       final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
       
-      Iterator<Entry<String,Long>> serversToDelIter = serversToDelete.entrySet().iterator();
-      while (serversToDelIter.hasNext()) {
-        Entry<String,Long> entry = serversToDelIter.next();
-        if (System.currentTimeMillis() - entry.getValue() > 10000) {
-          String serverNode = path + "/" + entry.getKey();
-          serversToDelIter.remove();
-          deleteServerNode(serverNode);
-        }
-      }
-
-      for (String server : getZooCache().getChildren(path)) {
-        if (serversToDelete.containsKey(server))
-          continue;
-
-        checkServer(updates, doomed, path, server, 2);
+      HashSet<String> all = new HashSet<String>(current.keySet());
+      all.addAll(getZooCache().getChildren(path));
+      
+      for (String server : all) {
+        checkServer(updates, doomed, path, server);
       }
 
       // log.debug("Current: " + current.keySet());
@@ -275,94 +252,71 @@ public class LiveTServerSet implements W
     }
   }
   
-  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String server,
-      int recurse)
-      throws TException,
-      InterruptedException, KeeperException {
-    
-    if (recurse == 0)
-      return;
+  private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String server)
+      throws TException, InterruptedException, KeeperException {
 
-    // See if we have an async lock in place?
     TServerInfo info = current.get(server);
-    TServerLockWatcher watcher;
-    ZooLock lock;
+    
     final String lockPath = path + "/" + server;
-    if (info != null) {
-      // yep: get out the lock/watcher so we can check on it
-      watcher = info.watcher;
-      lock = info.lock;
-    } else {
-      // nope: create a new lock and watcher
-      lock = new ZooLock(lockPath);
-      watcher = new TServerLockWatcher();
-      lock.lockAsync(watcher, "master".getBytes());
-    }
-    TServerInstance instance = null;
-    // Did we win the lock yet?
-    if (!lock.isLocked() && !watcher.gotLock && watcher.failureException == null) {
-      // Nope... there's a server out there: is this is a new server?
-      if (info == null) {
-        // Yep: hold onto the information about this server
-        Stat stat = new Stat();
-        byte[] lockData = ZooLock.getLockData(lockPath, stat);
-        String lockString = new String(lockData == null ? new byte[] {} : lockData);
-        if (lockString.length() > 0 && !lockString.equals("master")) {
-          ServerServices services = new ServerServices(new String(lockData));
-          InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
-          InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
-          TServerConnection conn = new TServerConnection(addr);
-          instance = new TServerInstance(client, stat.getEphemeralOwner());
-          info = new TServerInfo(lock, instance, conn, watcher);
-          current.put(server, info);
-          updates.add(instance);
-        } else {
-          lock.tryToCancelAsyncLockOrUnlock();
-        }
-      }
-    } else {
-      // Yes... there is no server here any more
-      lock.tryToCancelAsyncLockOrUnlock();
+    Stat stat = new Stat();
+    byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
+
+    if (lockData == null) {
       if (info != null) {
-        // a server existed here and went away so delete its node
         doomed.add(info.instance);
         current.remove(server);
-        info.cleanup();
-        deleteServerNode(lockPath);
-      } else {
-        // never knew of this server before... it could be a new server that has not created its lock node yet... watch and see if it creates the node or
-        // delete it later if it does not
-        List<String> children = ZooReaderWriter.getInstance().getChildren(lockPath, new Watcher() {
-          @Override
-          public void process(WatchedEvent arg0) {
-            if (arg0.getType() == EventType.NodeChildrenChanged) {
-              Set<TServerInstance> updates = new HashSet<TServerInstance>();
-              Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-              try {
-                checkServer(updates, doomed, path, server, 2);
-              } catch (Exception ex) {
-                log.error(ex, ex);
-              }
-
-              if (!doomed.isEmpty() || !updates.isEmpty())
-                cback.update(LiveTServerSet.this, doomed, updates);
-            }
-          }
-        });
-        
-        if (children.size() > 0) {
-          checkServer(updates, doomed, path, server, recurse--);
-        } else {
-          if (!serversToDelete.containsKey(server))
-            serversToDelete.put(server, System.currentTimeMillis());
-        }
+      }
+      
+      deleteServerNode(path + "/" + server);
+    } else {
+      ServerServices services = new ServerServices(new String(lockData));
+      InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+      InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
+      TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
+      
+      if (info == null) {
+        updates.add(instance);
+        current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+      } else if (!info.instance.equals(instance)) {
+        doomed.add(info.instance);
+        updates.add(instance);
+        current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
       }
     }
   }
-  
+
   @Override
   public void process(WatchedEvent event) {
-    scanServers();
+
+    // its imporant that these event are propogated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
+    // relevant nodes before code below reads from zoocache
+
+    if (event.getPath() != null) {
+      if (event.getPath().endsWith(Constants.ZTSERVERS)) {
+        scanServers();
+      } else if (event.getPath().contains(Constants.ZTSERVERS)) {
+        int pos = event.getPath().lastIndexOf('/');
+
+        // do only if ZTSERVER is parent
+        if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
+          
+          String server = event.getPath().substring(pos + 1);
+          
+          final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+          final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+          
+          final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+
+          try {
+            checkServer(updates, doomed, path, server);
+            if (!doomed.isEmpty() || !updates.isEmpty())
+              this.cback.update(this, doomed, updates);
+          } catch (Exception ex) {
+            log.error(ex, ex);
+          }
+        }
+      }
+    }
   }
   
   public synchronized TServerConnection getConnection(TServerInstance server) throws TException {
@@ -404,14 +358,8 @@ public class LiveTServerSet implements W
   }
   
   public synchronized void remove(TServerInstance server) {
-    TServerInfo remove = current.remove(server.hostPort());
-    if (remove != null) {
-      try {
-        remove.cleanup();
-      } catch (Exception e) {
-        log.info("error cleaning up connection to server", e);
-      }
-    }
+    current.remove(server.hostPort());
+
     log.info("Removing zookeeper lock for " + server);
     String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + server.hostPort();
     try {