You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by vi...@apache.org on 2013/06/24 20:38:07 UTC
svn commit: r1496155 - in /accumulo/branches/ACCUMULO-CURATOR:
fate/src/main/java/org/apache/accumulo/fate/zookeeper/
server/src/main/java/org/apache/accumulo/server/conf/
server/src/main/java/org/apache/accumulo/server/master/
server/src/main/java/org...
Author: vines
Date: Mon Jun 24 18:38:06 2013
New Revision: 1496155
URL: http://svn.apache.org/r1496155
Log:
It seems to work at this point
Modified:
accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
Modified: accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java?rev=1496155&r1=1496154&r2=1496155&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java Mon Jun 24 18:38:06 2013
@@ -29,8 +29,10 @@ import org.apache.curator.framework.reci
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.Watcher;
/**
* Caches values stored in zookeeper and keeps them up to date as they change in zookeeper.
@@ -45,31 +47,40 @@ public class ZooCache {
private CuratorFramework curator;
public ZooCache(String zooKeepers, int sessionTimeout) {
- this(zooKeepers, sessionTimeout, null);
+ this(CuratorUtil.constructCurator(zooKeepers, sessionTimeout, null));
}
- public ZooCache(String zooKeepers, int sessionTimeout, Watcher watcher) {
- this(CuratorUtil.constructCurator(zooKeepers, sessionTimeout, null), watcher);
- }
-
- public ZooCache(CuratorFramework curator, Watcher watcher) {
+ public ZooCache(CuratorFramework curator) {
this.curator = curator;
this.nodeCache = new HashMap<String,NodeCache>();
this.childrenCache = new HashMap<String,PathChildrenCache>();
}
public synchronized List<ChildData> getChildren(final String zPath) {
+ return getChildren(zPath, null);
+ }
+
+ public synchronized List<ChildData> getChildren(String zPath, PathChildrenCacheListener listener) {
PathChildrenCache cache = childrenCache.get(zPath);
if (cache == null) {
cache = new PathChildrenCache(curator, zPath, true);
+ if (listener != null) {
+ cache.getListenable().addListener(listener);
+ }
try {
cache.start(StartMode.BUILD_INITIAL_CACHE);
+ // I'll do it myself!
+ if (listener != null)
+ for (ChildData cd : cache.getCurrentData()) {
+ listener.childEvent(curator, new PathChildrenCacheEvent(Type.INITIALIZED, cd));
+ }
- // Because parent's children are being watched, we don't need a node watcher on the individual node
+ // Because parent's children are being watched, we don't need to cache the individual node
+ // UNLESS we have a listener on it
for (ChildData child : cache.getCurrentData()) {
NodeCache childCache = nodeCache.get(child.getPath());
- if (childCache != null)
- {
+ if (childCache != null && childCache.getListenable().size() == 0) {
+ log.debug("Removing cache " + childCache.getCurrentData().getPath() + " because parent cache was added");
childCache.close();
nodeCache.remove(child.getPath());
}
@@ -85,6 +96,8 @@ public class ZooCache {
return null;
}
childrenCache.put(zPath, cache);
+ } else if (listener != null) {
+ log.debug("LISTENER- cache is null for path " + zPath + ", but got listener " + listener.getClass() + ". this is a broken case!");
}
return cache.getCurrentData();
}
@@ -168,6 +181,10 @@ public class ZooCache {
childrenCache.clear();
}
+ public CuratorFramework getCurator() {
+ return curator;
+ }
+
public synchronized void clear(String zPath) {
List<String> pathsToRemove = new ArrayList<String>();
for (Iterator<String> i = nodeCache.keySet().iterator(); i.hasNext();) {
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java?rev=1496155&r1=1496154&r2=1496155&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfWatcher.java Mon Jun 24 18:38:06 2013
@@ -16,95 +16,35 @@
*/
package org.apache.accumulo.server.conf;
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.log4j.Level;
+import org.apache.accumulo.fate.curator.CuratorUtil;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.log4j.Logger;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-class TableConfWatcher implements Watcher {
- static {
- Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN);
- Logger.getLogger("org.apache.hadoop.io.compress").setLevel(Level.WARN);
- }
-
+class TableConfWatcher implements PathChildrenCacheListener {
private static final Logger log = Logger.getLogger(TableConfWatcher.class);
- private Instance instance = null;
-
- TableConfWatcher(Instance instance) {
- this.instance = instance;
+ private TableConfiguration tableConfig;
+
+ TableConfWatcher(TableConfiguration tableConfiguration) {
+ tableConfig = tableConfiguration;
}
@Override
- public void process(WatchedEvent event) {
- String path = event.getPath();
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
if (log.isTraceEnabled())
- log.trace("WatchEvent : " + path + " " + event.getState() + " " + event.getType());
-
- String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES + "/";
-
- String tableId = null;
- String key = null;
-
- if (path != null) {
- if (path.startsWith(tablesPrefix)) {
- tableId = path.substring(tablesPrefix.length());
- if (tableId.contains("/")) {
- tableId = tableId.substring(0, tableId.indexOf('/'));
- if (path.startsWith(tablesPrefix + tableId + Constants.ZTABLE_CONF + "/"))
- key = path.substring((tablesPrefix + tableId + Constants.ZTABLE_CONF + "/").length());
- }
- }
-
- if (tableId == null) {
- log.warn("Zookeeper told me about a path I was not watching " + path + " state=" + event.getState() + " type=" + event.getType());
- return;
- }
- }
+ log.trace("WatchEvent : " + event.getData().getPath() + " " + event.getType());
+ String key = CuratorUtil.getNodeName(event.getData());
switch (event.getType()) {
- case NodeDataChanged:
- if (log.isTraceEnabled())
- log.trace("EventNodeDataChanged " + event.getPath());
- if (key != null)
- ServerConfiguration.getTableConfiguration(instance, tableId).propertyChanged(key);
- break;
- case NodeChildrenChanged:
- ServerConfiguration.getTableConfiguration(instance, tableId).propertiesChanged(key);
- break;
- case NodeDeleted:
- if (key == null) {
- // only remove the AccumuloConfiguration object when a
- // table node is deleted, not when a tables property is
- // deleted.
- ServerConfiguration.removeTableIdInstance(tableId);
- }
- break;
- case None:
- switch (event.getState()) {
- case Expired:
- ServerConfiguration.expireAllTableObservers();
- break;
- case SyncConnected:
- break;
- case Disconnected:
- break;
- default:
- log.warn("EventNone event not handled path = " + event.getPath() + " state=" + event.getState());
- }
- break;
- case NodeCreated:
- switch (event.getState()) {
- case SyncConnected:
- break;
- default:
- log.warn("Event NodeCreated event not handled path = " + event.getPath() + " state=" + event.getState());
- }
+ case INITIALIZED:
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ case CHILD_REMOVED:
+ tableConfig.propertyChanged(key);
break;
default:
- log.warn("Event not handled path = " + event.getPath() + " state=" + event.getState() + " type = " + event.getType());
+ log.debug("Unhandled state " + event.getType() + " encountered for table " + tableConfig.getTableId() + ". Ignoring.");
}
}
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java?rev=1496155&r1=1496154&r2=1496155&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/conf/TableConfiguration.java Mon Jun 24 18:38:06 2013
@@ -26,14 +26,13 @@ import java.util.Set;
import java.util.TreeMap;
import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ConfigurationObserver;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.zookeeper.ZooUtil;
import org.apache.accumulo.fate.curator.CuratorUtil;
-import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.server.client.HdfsZooInstance;
+import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.log4j.Logger;
@@ -53,20 +52,14 @@ public class TableConfiguration extends
this.parent = parent;
this.observers = Collections.synchronizedSet(new HashSet<ConfigurationObserver>());
- }
-
- /**
- * @deprecated not for client use
- */
- @Deprecated
- private static ZooCache getTablePropCache() {
- Instance inst = HdfsZooInstance.getInstance();
+
if (tablePropCache == null)
synchronized (TableConfiguration.class) {
if (tablePropCache == null)
- tablePropCache = new ZooCache(inst.getZooKeepers(), inst.getZooKeepersSessionTimeOut(), new TableConfWatcher(inst));
+ tablePropCache = new ZooCache(HdfsZooInstance.getInstance().getConfiguration());
}
- return tablePropCache;
+ String confPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + '/' + table + Constants.ZTABLE_CONF;
+ tablePropCache.getChildren(confPath, new TableConfWatcher(this));
}
public void addObserver(ConfigurationObserver co) {
@@ -100,7 +93,7 @@ public class TableConfiguration extends
co.propertyChanged(key);
}
- public void propertiesChanged(String key) {
+ public void propertiesChanged() {
Collection<ConfigurationObserver> copy = Collections.unmodifiableCollection(observers);
for (ConfigurationObserver co : copy)
co.propertiesChanged();
@@ -120,7 +113,7 @@ public class TableConfiguration extends
private String get(String key) {
String zPath = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF + "/" + key;
- ChildData v = getTablePropCache().get(zPath);
+ ChildData v = tablePropCache.get(zPath);
String value = null;
if (v != null)
value = new String(v.getData());
@@ -135,7 +128,7 @@ public class TableConfiguration extends
entries.put(parentEntry.getKey(), parentEntry.getValue());
String path = ZooUtil.getRoot(instanceId) + Constants.ZTABLES + "/" + table + Constants.ZTABLE_CONF;
- List<ChildData> children = getTablePropCache().getChildren(path);
+ List<ChildData> children = tablePropCache.getChildren(path);
if (children != null) {
for (ChildData child : children) {
entries.put(CuratorUtil.getNodeName(child), new String(child.getData()));
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1496155&r1=1496154&r2=1496155&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Mon Jun 24 18:38:06 2013
@@ -20,9 +20,14 @@ import static org.apache.accumulo.fate.z
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import org.apache.accumulo.core.Constants;
@@ -36,6 +41,7 @@ import org.apache.accumulo.core.tabletse
import org.apache.accumulo.core.util.ServerServices;
import org.apache.accumulo.core.util.ThriftUtil;
import org.apache.accumulo.core.zookeeper.ZooUtil;
+import org.apache.accumulo.fate.curator.CuratorUtil;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.security.SecurityConstants;
import org.apache.accumulo.server.util.AddressUtil;
@@ -45,18 +51,18 @@ import org.apache.accumulo.server.zookee
import org.apache.accumulo.server.zookeeper.ZooLock;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.accumulo.trace.instrument.Tracer;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransport;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NotEmptyException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-public class LiveTServerSet implements Watcher {
+public class LiveTServerSet {
public interface Listener {
void update(LiveTServerSet current, Set<TServerInstance> deleted, Set<TServerInstance> added);
@@ -214,124 +220,133 @@ public class LiveTServerSet implements W
public synchronized ZooCache getZooCache() {
if (zooCache == null)
- zooCache = new ZooCache(this);
+ zooCache = new ZooCache();
return zooCache;
}
public synchronized void startListeningForTabletServerChanges() {
- scanServers();
SimpleTimer.getInstance().schedule(new Runnable() {
@Override
public void run() {
- scanServers();
+ synchronized (locklessServers) {
+ if (!locklessServers.isEmpty()) {
+ List<String> toRemove = new ArrayList<String>();
+ for (Entry<String,Long> entry : locklessServers.entrySet()) {
+ if (System.currentTimeMillis() - entry.getValue() > 600000) {
+ deleteServerNode(entry.getKey());
+ toRemove.add(entry.getKey());
+ }
+ }
+ locklessServers.keySet().removeAll(toRemove);
+ }
+ }
}
}, 0, 5000);
+
+ Collection<ChildData> result = getZooCache().getChildren(ZooUtil.getRoot(instance) + Constants.ZTSERVERS, serversListener);
+ log.debug("Attaching SERVERSLISTENER to " + (ZooUtil.getRoot(instance) + Constants.ZTSERVERS) + " - received " + result);
}
- public synchronized void scanServers() {
- try {
- final Set<TServerInstance> updates = new HashSet<TServerInstance>();
- final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-
- final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
-
- HashSet<String> all = new HashSet<String>(current.keySet());
- all.addAll(getZooCache().getChildKeys(path));
-
- locklessServers.keySet().retainAll(all);
-
- for (String server : all) {
- checkServer(updates, doomed, path, server);
- }
-
- // log.debug("Current: " + current.keySet());
- if (!doomed.isEmpty() || !updates.isEmpty())
- this.cback.update(this, doomed, updates);
- } catch (Exception ex) {
- log.error(ex, ex);
- }
- }
-
- private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException {
+ private void deleteServerNode(String server) {
try {
- ZooReaderWriter.getInstance().delete(serverNode, -1);
+ getZooCache().getCurator().delete().forPath(ZooUtil.getRoot(instance) + Constants.ZTSERVERS + '/' + server);
} catch (NotEmptyException ex) {
// race condition: tserver created the lock after our last check; we'll see it at the next check
} catch (NoNodeException nne) {
// someone else deleted it
+ } catch (Exception e) {
+ // Some other curator exception, we don't care that much here.
+ log.error(e,e);
}
}
- private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String server)
- throws TException, InterruptedException, KeeperException {
-
- TServerInfo info = current.get(server);
+ private ServersDirectoryListener serversListener = new ServersDirectoryListener(this);
+ private TServerLockListener lockListener = new TServerLockListener(this);
+
+ private class ServersDirectoryListener implements PathChildrenCacheListener {
+ LiveTServerSet liveTServerSet;
- final String lockPath = path + "/" + server;
- ChildData lockData = ZooLock.getLockData(getZooCache(), lockPath);
+ public ServersDirectoryListener(LiveTServerSet liveTServerSet) {
+ this.liveTServerSet = liveTServerSet;
+ }
- if (lockData == null) {
- if (info != null) {
- doomed.add(info.instance);
- current.remove(server);
- }
-
- Long firstSeen = locklessServers.get(server);
- if (firstSeen == null) {
- locklessServers.put(server, System.currentTimeMillis());
- } else if (System.currentTimeMillis() - firstSeen > 600000) {
- deleteServerNode(path + "/" + server);
- locklessServers.remove(server);
- }
- } else {
- locklessServers.remove(server);
- ServerServices services = new ServerServices(new String(lockData.getData()));
- InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
- InetSocketAddress addr = AddressUtil.parseAddress(server);
- TServerInstance instance = new TServerInstance(client, lockData.getStat().getEphemeralOwner());
+ @Override
+ public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception {
+ final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+ log.debug("SERVERSLISTENER - Received event " + event.getType() + " for node " + event.getData().getPath());
+
+ String server = CuratorUtil.getNodeName(event.getData());
+ TServerInfo info = current.get(server);
- if (info == null) {
- updates.add(instance);
- current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
- } else if (!info.instance.equals(instance)) {
- doomed.add(info.instance);
- updates.add(instance);
- current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+ switch (event.getType()) {
+ case INITIALIZED:
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ getZooCache().getChildren(event.getData().getPath(), lockListener);
+ break;
+ case CHILD_REMOVED:
+ getZooCache().clear(event.getData().getPath());
+ if (info != null) {
+ doomed.add(info.instance);
+ current.remove(server);
+ }
+ break;
+ default:
+ log.debug("Unhandled state " + event.getType() + " encountered for tserver manager. Ignoring.");
}
+ if (!doomed.isEmpty())
+ liveTServerSet.cback.update(liveTServerSet, doomed, Collections.<TServerInstance> emptySet());
}
}
- @Override
- public void process(WatchedEvent event) {
-
- // its important that these event are propagated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
- // relevant nodes before code below reads from zoocache
-
- if (event.getPath() != null) {
- if (event.getPath().endsWith(Constants.ZTSERVERS)) {
- scanServers();
- } else if (event.getPath().contains(Constants.ZTSERVERS)) {
- int pos = event.getPath().lastIndexOf('/');
-
- // do only if ZTSERVER is parent
- if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
-
- String server = event.getPath().substring(pos + 1);
-
- final Set<TServerInstance> updates = new HashSet<TServerInstance>();
- final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
-
- final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+ private class TServerLockListener implements PathChildrenCacheListener {
+ LiveTServerSet liveTServerSet;
+
+ public TServerLockListener(LiveTServerSet liveTServerSet) {
+ this.liveTServerSet = liveTServerSet;
+ }
+
+ @Override
+ public void childEvent(CuratorFramework curator, PathChildrenCacheEvent event) throws Exception {
+ final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+ final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+ log.debug("LOCKLISTENER - Received event " + event.getType() + " for node " + event.getData().getPath());
+
+ String server = CuratorUtil.getNodeName(CuratorUtil.getNodeParent(event.getData()));
+ TServerInfo info = current.get(server);
+
+ switch (event.getType()) {
+ case INITIALIZED:
+ case CHILD_ADDED:
+ case CHILD_UPDATED:
+ synchronized (locklessServers) {
+ locklessServers.remove(server);
+ }
+ ServerServices services = new ServerServices(new String(event.getData().getData()));
+ InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+ InetSocketAddress addr = AddressUtil.parseAddress(server);
+ TServerInstance instance = new TServerInstance(client, event.getData().getStat().getEphemeralOwner());
- try {
- checkServer(updates, doomed, path, server);
- if (!doomed.isEmpty() || !updates.isEmpty())
- this.cback.update(this, doomed, updates);
- } catch (Exception ex) {
- log.error(ex, ex);
+ if (info == null) {
+ updates.add(instance);
+ current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+ } else if (!info.instance.equals(instance)) {
+ doomed.add(info.instance);
+ updates.add(instance);
+ current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
}
- }
+ break;
+ case CHILD_REMOVED:
+ synchronized (locklessServers) {
+ locklessServers.put(server, System.currentTimeMillis());
+ }
+ break;
+ default:
+ log.debug("Unhandled state " + event.getType() + " encountered for tserver lock manager. Ignoring.");
}
+
+ if (!doomed.isEmpty() || !updates.isEmpty())
+ liveTServerSet.cback.update(liveTServerSet, doomed, updates);
}
}
@@ -354,6 +369,7 @@ public class LiveTServerSet implements W
for (TServerInfo c : current.values()) {
result.add(c.instance);
}
+ log.debug("Returning " + result + " for current tservers");
return result;
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java?rev=1496155&r1=1496154&r2=1496155&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/MetaDataTableScanner.java Mon Jun 24 18:38:06 2013
@@ -58,7 +58,9 @@ public class MetaDataTableScanner implem
mdScanner.setRanges(Collections.singletonList(range));
iter = mdScanner.iterator();
} catch (Exception ex) {
- mdScanner.close();
+ log.error(ex, ex);
+ if (mdScanner != null)
+ mdScanner.close();
throw new RuntimeException(ex);
}
}
Modified: accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java
URL: http://svn.apache.org/viewvc/accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java?rev=1496155&r1=1496154&r2=1496155&view=diff
==============================================================================
--- accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java (original)
+++ accumulo/branches/ACCUMULO-CURATOR/server/src/main/java/org/apache/accumulo/server/master/state/tables/TableManager.java Mon Jun 24 18:38:06 2013
@@ -37,12 +37,13 @@ import org.apache.accumulo.server.client
import org.apache.accumulo.server.util.TablePropUtil;
import org.apache.accumulo.server.zookeeper.ZooCache;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent.Type;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
public class TableManager {
private static SecurityPermission TABLE_MANAGER_PERMISSION = new SecurityPermission("tableManagerPermission");
@@ -82,7 +83,8 @@ public class TableManager {
private TableManager() {
instance = HdfsZooInstance.getInstance();
- zooStateCache = new ZooCache(new TableStateWatcher());
+ zooStateCache = new ZooCache();
+ setupListeners();
updateTableStateCache();
}
@@ -159,10 +161,11 @@ public class TableManager {
}
}
- public TableState updateTableStateCache(String tableId) {
+ // tableId argument for better debug statements
+ private TableState updateTableStateCache(ChildData node, String tableId) {
synchronized (tableStateCache) {
TableState tState = TableState.UNKNOWN;
- byte[] data = zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE).getData();
+ byte[] data = node.getData();
if (data != null) {
String sState = new String(data);
try {
@@ -177,6 +180,10 @@ public class TableManager {
}
}
+ public TableState updateTableStateCache(String tableId) {
+ return updateTableStateCache(zooStateCache.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE), tableId);
+ }
+
public void addTable(String tableId, String tableName, NodeExistsPolicy existsPolicy) throws KeeperException, InterruptedException {
prepareNewTableState(instance.getInstanceID(), tableId, tableName, TableState.NEW, existsPolicy);
updateTableStateCache(tableId);
@@ -221,74 +228,46 @@ public class TableManager {
return observers.remove(to);
}
- private class TableStateWatcher implements Watcher {
+ // Sets up cache listeners for the zookeeper cache
+ private void setupListeners() {
+ zooStateCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES, new AllTablesListener());
+ }
+
+ // This just manages the listeners for each table. Let the table listener do the heavy lifting
+ private class AllTablesListener implements PathChildrenCacheListener {
@Override
- public void process(WatchedEvent event) {
- if (log.isTraceEnabled())
- log.trace(event);
-
- final String zPath = event.getPath();
- final EventType zType = event.getType();
-
- String tablesPrefix = ZooUtil.getRoot(instance) + Constants.ZTABLES;
- String tableId = null;
-
- if (zPath != null && zPath.startsWith(tablesPrefix + "/")) {
- String suffix = zPath.substring(tablesPrefix.length() + 1);
- if (suffix.contains("/")) {
- String[] sa = suffix.split("/", 2);
- if (Constants.ZTABLE_STATE.equals("/" + sa[1]))
- tableId = sa[0];
- }
- if (tableId == null) {
- log.warn("Unknown path in " + event);
- return;
- }
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+ if (event.getType().equals(Type.CHILD_ADDED)) {
+ zooStateCache.getChildren(event.getData().getPath(), new TableListener());
+ } else if (event.getType().equals(Type.CHILD_REMOVED)) {
+ zooStateCache.clear(event.getData().getPath());
}
-
- switch (zType) {
- case NodeChildrenChanged:
- if (zPath != null && zPath.equals(tablesPrefix)) {
- updateTableStateCache();
- } else {
- log.warn("Unexpected path " + zPath);
- }
- break;
- case NodeCreated:
- case NodeDataChanged:
- // state transition
- TableState tState = updateTableStateCache(tableId);
- log.debug("State transition to " + tState + " @ " + event);
- synchronized (observers) {
- for (TableObserver to : observers)
- to.stateChanged(tableId, tState);
- }
- break;
- case NodeDeleted:
- if (zPath != null
- && tableId != null
- && (zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_STATE) || zPath.equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_CONF) || zPath
- .equals(tablesPrefix + "/" + tableId + Constants.ZTABLE_NAME)))
+ }
+ }
+
+ private class TableListener implements PathChildrenCacheListener {
+ @Override
+ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
+ if (event.getData().getPath().endsWith(Constants.ZTABLE_STATE)) {
+ String tableId = CuratorUtil.getNodeName(CuratorUtil.getNodeParent(event.getData().getPath()));
+ String msg = null;
+ switch (event.getType()) {
+ case CHILD_ADDED:
+ case INITIALIZED:
+ msg = "Initializing ";
+ case CHILD_UPDATED:
+ if (msg == null)
+ msg = "Updating ";
+ TableState state = updateTableStateCache(event.getData(), tableId);
+ log.debug(msg + tableId + " to state " + state);
+ break;
+ case CHILD_REMOVED:
tableStateCache.remove(tableId);
- break;
- case None:
- switch (event.getState()) {
- case Expired:
- if (log.isTraceEnabled())
- log.trace("Session expired " + event);
- synchronized (observers) {
- for (TableObserver to : observers)
- to.sessionExpired();
- }
- break;
- case SyncConnected:
- default:
- if (log.isTraceEnabled())
- log.trace("Ignored " + event);
- }
- break;
- default:
- log.warn("Unandled " + event);
+ log.debug("Table " + tableId + " removed.");
+ break;
+ default:
+ log.debug("Unhandled state " + event.getType() + " encountered for table " + tableId + ". Ignoring.");
+ }
}
}
}