You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/07/10 19:49:03 UTC

svn commit: r1501877 - in /activemq/trunk: ./ activemq-leveldb-store/ activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/groups/ activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ activemq-leveldb-sto...

Author: chirino
Date: Wed Jul 10 17:49:03 2013
New Revision: 1501877

URL: http://svn.apache.org/r1501877
Log:
Improve the replicated leveldb bits: Avoid dependencies on fabric-group stuff.  Makes it easier to embed in different versions of a fabric osgi env.

Added:
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/groups/
    activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/groups/ZKClient.java
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
Modified:
    activemq/trunk/activemq-leveldb-store/pom.xml
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
    activemq/trunk/activemq-osgi/pom.xml
    activemq/trunk/assembly/pom.xml
    activemq/trunk/assembly/src/main/descriptors/common-bin.xml
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-leveldb-store/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/pom.xml?rev=1501877&r1=1501876&r2=1501877&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/pom.xml (original)
+++ activemq/trunk/activemq-leveldb-store/pom.xml Wed Jul 10 17:49:03 2013
@@ -114,24 +114,26 @@
       <version>${hawtdispatch-version}</version>
       <scope>provided</scope>
     </dependency>
+
     <dependency>
-      <groupId>org.fusesource.fabric</groupId>
-      <artifactId>fabric-groups</artifactId>
-      <version>${fabric-version}</version>
+      <groupId>org.linkedin</groupId>
+      <artifactId>org.linkedin.zookeeper-impl</artifactId>
+      <version>${linkedin-zookeeper-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.fusesource.fabric</groupId>
-      <artifactId>fabric-linkedin-zookeeper</artifactId>
-      <version>${fabric-version}</version>
+      <groupId>org.linkedin</groupId>
+      <artifactId>org.linkedin.util-core</artifactId>
+      <version>${linkedin-zookeeper-version}</version>
       <scope>provided</scope>
     </dependency>
     <dependency>
-      <groupId>org.fusesource.fabric</groupId>
-      <artifactId>fabric-zookeeper</artifactId>
-      <version>${fabric-version}</version>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper-version}</version>
       <scope>provided</scope>
     </dependency>
+
     <dependency>
       <groupId>org.osgi</groupId>
       <artifactId>org.osgi.core</artifactId>

Added: activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/groups/ZKClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/groups/ZKClient.java?rev=1501877&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/groups/ZKClient.java (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/java/org/apache/activemq/leveldb/replicated/groups/ZKClient.java Wed Jul 10 17:49:03 2013
@@ -0,0 +1,631 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated.groups;
+
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.linkedin.util.clock.Clock;
+import org.linkedin.util.clock.SystemClock;
+import org.linkedin.util.clock.Timespan;
+import org.linkedin.util.concurrent.ConcurrentUtils;
+import org.linkedin.util.io.PathUtils;
+import org.linkedin.zookeeper.client.ChrootedZKClient;
+import org.linkedin.zookeeper.client.IZooKeeper;
+import org.linkedin.zookeeper.client.IZooKeeperFactory;
+import org.linkedin.zookeeper.client.LifecycleListener;
+import org.linkedin.zookeeper.client.ZooKeeperFactory;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.service.cm.ConfigurationException;
+import org.slf4j.Logger;
+
+public class ZKClient extends org.linkedin.zookeeper.client.AbstractZKClient implements Watcher {
+
+    private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(ZKClient.class.getName());
+
+    private Map<String, String> acls;
+    private String password;
+
+
+    public void start() throws Exception {
+        // Grab the lock to make sure that the registration of the ManagedService
+        // won't be updated immediately but that the initial update will happen first
+        synchronized (_lock) {
+            _stateChangeDispatcher.setDaemon(true);
+            _stateChangeDispatcher.start();
+
+            doStart();
+        }
+    }
+
+    public void setACLs(Map<String, String> acls) {
+        this.acls = acls;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+
+    protected void doStart() throws InvalidSyntaxException, ConfigurationException, UnsupportedEncodingException {
+        connect();
+    }
+
+    @Override
+    public void close() {
+        if (_stateChangeDispatcher != null) {
+            _stateChangeDispatcher.end();
+            try {
+                _stateChangeDispatcher.join(1000);
+            } catch(Exception e) {
+                LOG.debug("ignored exception", e);
+            }
+        }
+        synchronized(_lock) {
+            if (_zk != null) {
+                try {
+                    changeState(State.NONE);
+                    _zk.close();
+                    // We try to avoid a NPE when shutting down fabric:
+                    // java.lang.NullPointerException
+                    //     at org.apache.felix.framework.BundleWiringImpl.findClassOrResourceByDelegation(BundleWiringImpl.java:1433)
+                    //     at org.apache.felix.framework.BundleWiringImpl.access$400(BundleWiringImpl.java:73)
+                    //     at org.apache.felix.framework.BundleWiringImpl$BundleClassLoader.loadClass(BundleWiringImpl.java:1844)
+                    //     at java.lang.ClassLoader.loadClass(ClassLoader.java:247)
+                    //     at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1089)
+                    Thread th = getSendThread();
+                    if (th != null) {
+                        th.join(1000);
+                    }
+                    _zk = null;
+                } catch(Exception e) {
+                    LOG.debug("ignored exception", e);
+                }
+            }
+        }
+    }
+
+    protected Thread getSendThread() {
+        try {
+            return (Thread) getField(_zk, "_zk", "cnxn", "sendThread");
+        } catch (Throwable e) {
+            return null;
+        }
+    }
+
+    protected Object getField(Object obj, String... names) throws Exception {
+        for (String name : names) {
+            obj = getField(obj, name);
+        }
+        return obj;
+    }
+
+    protected Object getField(Object obj, String name) throws Exception {
+        Class clazz = obj.getClass();
+        while (clazz != null) {
+            for (Field f : clazz.getDeclaredFields()) {
+                if (f.getName().equals(name)) {
+                    f.setAccessible(true);
+                    return f.get(obj);
+                }
+            }
+        }
+        throw new NoSuchFieldError(name);
+    }
+
+    protected void changeState(State newState) {
+        synchronized (_lock) {
+            State oldState = _state;
+            if (oldState != newState) {
+                _stateChangeDispatcher.addEvent(oldState, newState);
+                _state = newState;
+                _lock.notifyAll();
+            }
+        }
+    }
+
+    public void testGenerateConnectionLoss() throws Exception {
+        waitForConnected();
+        Object clientCnxnSocket  = getField(_zk, "_zk", "cnxn", "sendThread", "clientCnxnSocket");
+        callMethod(clientCnxnSocket, "testableCloseSocket");
+    }
+
+    protected Object callMethod(Object obj, String name, Object... args) throws Exception {
+        Class clazz = obj.getClass();
+        while (clazz != null) {
+            for (Method m : clazz.getDeclaredMethods()) {
+                if (m.getName().equals(name)) {
+                    m.setAccessible(true);
+                    return m.invoke(obj, args);
+                }
+            }
+        }
+        throw new NoSuchMethodError(name);
+    }
+
+    protected void tryConnect() {
+        synchronized (_lock) {
+            try {
+                connect();
+            } catch (Throwable e) {
+                LOG.warn("Error while restarting:", e);
+                if (_expiredSessionRecovery == null) {
+                    _expiredSessionRecovery = new ExpiredSessionRecovery();
+                    _expiredSessionRecovery.setDaemon(true);
+                    _expiredSessionRecovery.start();
+                }
+            }
+        }
+    }
+
+    public void connect() throws UnsupportedEncodingException {
+        synchronized (_lock) {
+            changeState(State.CONNECTING);
+            _zk = _factory.createZooKeeper(this);
+            if (password != null) {
+                _zk.addAuthInfo("digest", ("fabric:" + password).getBytes("UTF-8"));
+            }
+        }
+    }
+
+    public void process(WatchedEvent event) {
+        if (event.getState() != null) {
+            LOG.debug("event: {}", event.getState());
+            synchronized (_lock) {
+                switch(event.getState())
+                {
+                    case SyncConnected:
+                        changeState(State.CONNECTED);
+                        break;
+
+                    case Disconnected:
+                        if(_state != State.NONE) {
+                            changeState(State.RECONNECTING);
+                        }
+                        break;
+
+                    case Expired:
+                        // when expired, the zookeeper object is invalid and we need to recreate a new one
+                        _zk = null;
+                        LOG.warn("Expiration detected: trying to restart...");
+                        tryConnect();
+                        break;
+                    default:
+                        LOG.warn("unprocessed event state: {}", event.getState());
+                }
+            }
+        }
+    }
+
+    @Override
+    protected IZooKeeper getZk() {
+        State state = _state;
+        if (state == State.NONE) {
+            throw new IllegalStateException("ZooKeeper client has not been configured yet. You need to either create an ensemble or join one.");
+        } else if (state != State.CONNECTED) {
+            try {
+                waitForConnected();
+            } catch (Exception e) {
+                throw new IllegalStateException("Error waiting for ZooKeeper connection", e);
+            }
+        }
+        IZooKeeper zk = _zk;
+        if (zk == null) {
+            throw new IllegalStateException("No ZooKeeper connection available");
+        }
+        return zk;
+    }
+
+    public void waitForConnected(Timespan timeout) throws InterruptedException, TimeoutException {
+        waitForState(State.CONNECTED, timeout);
+    }
+
+    public void waitForConnected() throws InterruptedException, TimeoutException {
+        waitForConnected(null);
+    }
+
+    public void waitForState(State state, Timespan timeout) throws TimeoutException, InterruptedException {
+        long endTime = (timeout == null ? sessionTimeout : timeout).futureTimeMillis(_clock);
+        if (_state != state) {
+            synchronized (_lock) {
+                while (_state != state) {
+                    ConcurrentUtils.awaitUntil(_clock, _lock, endTime);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void registerListener(LifecycleListener listener) {
+        if (listener == null) {
+            throw new IllegalStateException("listener is null");
+        }
+        if (!_listeners.contains(listener)) {
+            _listeners.add(listener);
+
+        }
+        if (_state == State.CONNECTED) {
+            listener.onConnected();
+            //_stateChangeDispatcher.addEvent(null, State.CONNECTED);
+        }
+    }
+
+    @Override
+    public void removeListener(LifecycleListener listener) {
+        if (listener == null) {
+            throw new IllegalStateException("listener is null");
+        }
+        _listeners.remove(listener);
+    }
+
+    @Override
+    public org.linkedin.zookeeper.client.IZKClient chroot(String path) {
+        return new ChrootedZKClient(this, adjustPath(path));
+    }
+
+    @Override
+    public boolean isConnected() {
+        return _state == State.CONNECTED;
+    }
+
+    public boolean isConfigured() {
+        return _state != State.NONE;
+    }
+
+    @Override
+    public String getConnectString() {
+        return _factory.getConnectString();
+    }
+
+    public static enum State {
+        NONE,
+        CONNECTING,
+        CONNECTED,
+        RECONNECTING
+    }
+
+    private final static String CHARSET = "UTF-8";
+
+    private final Clock _clock = SystemClock.instance();
+    private final List<LifecycleListener> _listeners = new CopyOnWriteArrayList<LifecycleListener>();
+
+    protected final Object _lock = new Object();
+    protected volatile State _state = State.NONE;
+
+    private final StateChangeDispatcher _stateChangeDispatcher = new StateChangeDispatcher();
+
+    protected IZooKeeperFactory _factory;
+    protected IZooKeeper _zk;
+    protected Timespan _reconnectTimeout = Timespan.parse("20s");
+    protected Timespan sessionTimeout = new Timespan(30, Timespan.TimeUnit.SECOND);
+
+    private ExpiredSessionRecovery _expiredSessionRecovery = null;
+
+    private class StateChangeDispatcher extends Thread {
+        private final AtomicBoolean _running = new AtomicBoolean(true);
+        private final BlockingQueue<Boolean> _events = new LinkedBlockingQueue<Boolean>();
+
+        private StateChangeDispatcher() {
+            super("ZooKeeper state change dispatcher thread");
+        }
+
+        @Override
+        public void run() {
+            Map<Object, Boolean> history = new IdentityHashMap<Object, Boolean>();
+            LOG.info("Starting StateChangeDispatcher");
+            while (_running.get()) {
+                Boolean isConnectedEvent;
+                try {
+                    isConnectedEvent = _events.take();
+                } catch (InterruptedException e) {
+                    continue;
+                }
+                if (!_running.get() || isConnectedEvent == null) {
+                    continue;
+                }
+                Map<Object, Boolean> newHistory = callListeners(history, isConnectedEvent);
+                // we save which event each listener has seen last
+                // we don't update the map in place because we need to get rid of unregistered listeners
+                history = newHistory;
+            }
+            LOG.info("StateChangeDispatcher terminated.");
+        }
+
+        public void end() {
+            _running.set(false);
+            _events.add(false);
+        }
+
+        public void addEvent(ZKClient.State oldState, ZKClient.State newState) {
+            LOG.debug("addEvent: {} => {}", oldState, newState);
+            if (newState == ZKClient.State.CONNECTED) {
+                _events.add(true);
+            } else if (oldState == ZKClient.State.CONNECTED) {
+                _events.add(false);
+            }
+        }
+    }
+
+    protected Map<Object, Boolean> callListeners(Map<Object, Boolean> history, Boolean connectedEvent) {
+        Map<Object, Boolean> newHistory = new IdentityHashMap<Object, Boolean>();
+        for (LifecycleListener listener : _listeners) {
+            Boolean previousEvent = history.get(listener);
+            // we propagate the event only if it was not already sent
+            if (previousEvent == null || previousEvent != connectedEvent) {
+                try {
+                    if (connectedEvent) {
+                        listener.onConnected();
+                    } else {
+                        listener.onDisconnected();
+                    }
+                } catch (Throwable e) {
+                    LOG.warn("Exception while executing listener (ignored)", e);
+                }
+            }
+            newHistory.put(listener, connectedEvent);
+        }
+        return newHistory;
+    }
+
+    private class ExpiredSessionRecovery extends Thread {
+        private ExpiredSessionRecovery() {
+            super("ZooKeeper expired session recovery thread");
+        }
+
+        @Override
+        public void run() {
+            LOG.info("Entering recovery mode");
+            synchronized(_lock) {
+                try {
+                    int count = 0;
+                    while (_state == ZKClient.State.NONE) {
+                        try {
+                            count++;
+                            LOG.warn("Recovery mode: trying to reconnect to zookeeper [" + count + "]");
+                            ZKClient.this.connect();
+                        } catch (Throwable e) {
+                            LOG.warn("Recovery mode: reconnect attempt failed [" + count + "]... waiting for " + _reconnectTimeout, e);
+                            try {
+                                _lock.wait(_reconnectTimeout.getDurationInMilliseconds());
+                            } catch(InterruptedException e1) {
+                                throw new RuntimeException("Recovery mode: wait interrupted... bailing out", e1);
+                            }
+                        }
+                    }
+                } finally {
+                    _expiredSessionRecovery = null;
+                    LOG.info("Exiting recovery mode.");
+                }
+            }
+        }
+    }
+
+    /**
+     * Constructor
+     */
+    public ZKClient(String connectString, Timespan sessionTimeout, Watcher watcher)
+    {
+        this(new ZooKeeperFactory(connectString, sessionTimeout, watcher));
+    }
+
+    /**
+     * Constructor
+     */
+    public ZKClient(IZooKeeperFactory factory)
+    {
+        this(factory, null);
+    }
+
+    /**
+     * Constructor
+     */
+    public ZKClient(IZooKeeperFactory factory, String chroot)
+    {
+        super(chroot);
+        _factory = factory;
+        Map<String, String> acls = new HashMap<String, String>();
+        acls.put("/", "world:anyone:acdrw");
+        setACLs(acls);
+    }
+
+    static private int getPermFromString(String permString) {
+        int perm = 0;
+        for (int i = 0; i < permString.length(); i++) {
+            switch (permString.charAt(i)) {
+                case 'r':
+                    perm |= ZooDefs.Perms.READ;
+                    break;
+                case 'w':
+                    perm |= ZooDefs.Perms.WRITE;
+                    break;
+                case 'c':
+                    perm |= ZooDefs.Perms.CREATE;
+                    break;
+                case 'd':
+                    perm |= ZooDefs.Perms.DELETE;
+                    break;
+                case 'a':
+                    perm |= ZooDefs.Perms.ADMIN;
+                    break;
+                default:
+                    System.err
+                            .println("Unknown perm type: " + permString.charAt(i));
+            }
+        }
+        return perm;
+    }
+
+    private static List<ACL> parseACLs(String aclString) {
+        List<ACL> acl;
+        String acls[] = aclString.split(",");
+        acl = new ArrayList<ACL>();
+        for (String a : acls) {
+            int firstColon = a.indexOf(':');
+            int lastColon = a.lastIndexOf(':');
+            if (firstColon == -1 || lastColon == -1 || firstColon == lastColon) {
+                System.err
+                        .println(a + " does not have the form scheme:id:perm");
+                continue;
+            }
+            ACL newAcl = new ACL();
+            newAcl.setId(new Id(a.substring(0, firstColon), a.substring(
+                    firstColon + 1, lastColon)));
+            newAcl.setPerms(getPermFromString(a.substring(lastColon + 1)));
+            acl.add(newAcl);
+        }
+        return acl;
+    }
+
+    public Stat createOrSetByteWithParents(String path, byte[] data, List<ACL> acl, CreateMode createMode) throws InterruptedException, KeeperException {
+        if (exists(path) != null) {
+            return setByteData(path, data);
+        }
+        try {
+            createBytesNodeWithParents(path, data, acl, createMode);
+            return null;
+        } catch(KeeperException.NodeExistsException e) {
+            // this should not happen very often (race condition)
+            return setByteData(path, data);
+        }
+    }
+
+    public String create(String path, CreateMode createMode) throws InterruptedException, KeeperException {
+        return create(path, (byte[]) null, createMode);
+    }
+
+    public String create(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
+        return create(path, toByteData(data), createMode);
+    }
+
+    public String create(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
+        return getZk().create(adjustPath(path), data, getNodeACLs(path), createMode);
+    }
+
+    public String createWithParents(String path, CreateMode createMode) throws InterruptedException, KeeperException {
+        return createWithParents(path, (byte[]) null, createMode);
+    }
+
+    public String createWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
+        return createWithParents(path, toByteData(data), createMode);
+    }
+
+    public String createWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
+        createParents(path);
+        return create(path, data, createMode);
+    }
+
+    public Stat createOrSetWithParents(String path, String data, CreateMode createMode) throws InterruptedException, KeeperException {
+        return createOrSetWithParents(path, toByteData(data), createMode);
+    }
+
+    public Stat createOrSetWithParents(String path, byte[] data, CreateMode createMode) throws InterruptedException, KeeperException {
+        if (exists(path) != null) {
+            return setByteData(path, data);
+        }
+        try {
+            createWithParents(path, data, createMode);
+            return null;
+        } catch (KeeperException.NodeExistsException e) {
+            // this should not happen very often (race condition)
+            return setByteData(path, data);
+        }
+    }
+
+    public void fixACLs(String path, boolean recursive) throws InterruptedException, KeeperException {
+        if (exists(path) != null) {
+            doFixACLs(path, recursive);
+        }
+    }
+
+    private void doFixACLs(String path, boolean recursive) throws KeeperException, InterruptedException {
+        setACL(path, getNodeACLs(path), -1);
+        if (recursive) {
+            for (String child : getChildren(path)) {
+                doFixACLs(path.equals("/") ? "/" + child : path + "/" + child, recursive);
+            }
+        }
+    }
+
+    private List<ACL> getNodeACLs(String path) {
+        String acl = doGetNodeACLs(adjustPath(path));
+        if (acl == null) {
+            throw new IllegalStateException("Could not find matching ACLs for " + path);
+        }
+        return parseACLs(acl);
+    }
+
+    protected String doGetNodeACLs(String path) {
+        String longestPath = "";
+        for (String acl : acls.keySet()) {
+            if (acl.length() > longestPath.length() && path.startsWith(acl)) {
+                longestPath = acl;
+            }
+        }
+        return acls.get(longestPath);
+    }
+
+    private void createParents(String path) throws InterruptedException, KeeperException {
+        path = PathUtils.getParentPath(adjustPath(path));
+        path = PathUtils.removeTrailingSlash(path);
+        List<String> paths = new ArrayList<String>();
+        while(!path.equals("") && getZk().exists(path, false) == null) {
+            paths.add(path);
+            path = PathUtils.getParentPath(path);
+            path = PathUtils.removeTrailingSlash(path);
+        }
+        Collections.reverse(paths);
+        for(String p : paths) {
+            try {
+                getZk().create(p,
+                        null,
+                        getNodeACLs(p),
+                        CreateMode.PERSISTENT);
+            } catch(KeeperException.NodeExistsException e) {
+                // ok we continue...
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("parent already exists " + p);
+                }
+            }
+        }
+    }
+
+    private byte[] toByteData(String data) {
+        if (data == null) {
+            return null;
+        } else {
+            try {
+                return data.getBytes(CHARSET);
+            } catch(UnsupportedEncodingException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+}

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala?rev=1501877&r1=1501876&r2=1501877&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/ElectingLevelDBStore.scala Wed Jul 10 17:49:03 2013
@@ -16,8 +16,6 @@
  */
 package org.apache.activemq.leveldb.replicated
 
-import org.fusesource.fabric.groups._
-import org.fusesource.fabric.zookeeper.internal.ZKClient
 import org.linkedin.util.clock.Timespan
 import scala.reflect.BeanProperty
 import org.apache.activemq.util.{JMXSupport, ServiceStopper, ServiceSupport}
@@ -38,6 +36,7 @@ import org.apache.activemq.leveldb.Level
 import javax.management.ObjectName
 import javax.management.openmbean.{CompositeDataSupport, SimpleType, CompositeType, CompositeData}
 import java.util
+import org.apache.activemq.leveldb.replicated.groups._
 
 object ElectingLevelDBStore extends Log {
 

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala?rev=1501877&r1=1501876&r2=1501877&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala (original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/MasterElector.scala Wed Jul 10 17:49:03 2013
@@ -1,6 +1,6 @@
 package org.apache.activemq.leveldb.replicated
 
-import org.fusesource.fabric.groups._
+import org.apache.activemq.leveldb.replicated.groups._
 import org.codehaus.jackson.annotate.JsonProperty
 import org.apache.activemq.leveldb.util.{Log, JsonCodec}
 

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala?rev=1501877&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/ClusteredSingleton.scala Wed Jul 10 17:49:03 2013
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated.groups
+
+
+import collection.mutable.{ListBuffer, HashMap}
+import internal.ChangeListenerSupport
+
+import java.io._
+import org.codehaus.jackson.map.ObjectMapper
+import collection.JavaConversions._
+import java.util.LinkedHashMap
+import java.lang.{IllegalStateException, String}
+import reflect.BeanProperty
+import org.codehaus.jackson.annotate.JsonProperty
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait NodeState {
+
+  /**
+   * The id of the cluster node.  There can be multiple node with this ID,
+   * but only the first node in the cluster will be the master for for it.
+   */
+  def id: String
+
+  override
+  def toString = new String(ClusteredSupport.encode(this), "UTF-8")
+}
+
+class TextNodeState extends NodeState {
+  @BeanProperty
+  @JsonProperty
+  var id:String = _
+}
+
+/**
+ *
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ClusteredSupport {
+
+  val DEFAULT_MAPPER = new ObjectMapper
+
+  def decode[T](t : Class[T], buffer: Array[Byte], mapper: ObjectMapper=DEFAULT_MAPPER): T = decode(t, new ByteArrayInputStream(buffer), mapper)
+  def decode[T](t : Class[T], in: InputStream, mapper: ObjectMapper): T =  mapper.readValue(in, t)
+
+  def encode(value: AnyRef, mapper: ObjectMapper=DEFAULT_MAPPER): Array[Byte] = {
+    var baos: ByteArrayOutputStream = new ByteArrayOutputStream
+    encode(value, baos, mapper)
+    return baos.toByteArray
+  }
+
+  def encode(value: AnyRef, out: OutputStream, mapper: ObjectMapper): Unit = {
+    mapper.writeValue(out, value)
+  }
+
+}
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ClusteredSingletonWatcher[T <: NodeState](val stateClass:Class[T]) extends ChangeListenerSupport {
+  import ClusteredSupport._
+  
+  protected var _group:Group = _
+  def group = _group
+
+  /**
+   * Override to use a custom configured mapper.
+   */
+  def mapper = ClusteredSupport.DEFAULT_MAPPER
+
+  private val listener = new ChangeListener() {
+    def changed() {
+      val members = _group.members
+      val t = new LinkedHashMap[String, T]()
+      members.foreach {
+        case (path, data) =>
+          try {
+            val value = decode(stateClass, data, mapper)
+            t.put(path, value)
+          } catch {
+            case e: Throwable =>
+              e.printStackTrace()
+          }
+      }
+      changed_decoded(t)
+    }
+
+    def connected = {
+      changed
+      ClusteredSingletonWatcher.this.fireConnected
+    }
+
+    def disconnected = {
+      changed
+      ClusteredSingletonWatcher.this.fireDisconnected
+    }
+  }
+
+
+  def start(group:Group) = this.synchronized {
+    if(_group !=null )
+      throw new IllegalStateException("Already started.")
+    _group = group
+    _group.add(listener)
+  }
+
+  def stop = this.synchronized {
+    if(_group==null)
+      throw new IllegalStateException("Not started.")
+    _group.remove(listener)
+    _members = HashMap[String, ListBuffer[(String,  T)]]()
+    _group = null
+  }
+
+  def connected = this.synchronized {
+    if(_group==null) {
+      false
+    } else {
+      _group.connected
+    }
+  }
+
+  protected var _members = HashMap[String, ListBuffer[(String,  T)]]()
+  def members = this.synchronized { _members }
+
+  def changed_decoded(m: LinkedHashMap[String, T]) = {
+    this.synchronized {
+      if( _group!=null ) {
+        _members = HashMap[String, ListBuffer[(String,  T)]]()
+        m.foreach { case node =>
+          _members.getOrElseUpdate(node._2.id, ListBuffer[(String,  T)]()).append(node)
+        }
+      }
+    }
+    fireChanged
+  }
+
+  def masters = this.synchronized {
+    _members.mapValues(_.head._2).toArray.map(_._2).toArray(new ClassManifest[T] {
+      def erasure = stateClass
+    })
+  }
+
+}
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ClusteredSingleton[T <: NodeState ](stateClass:Class[T]) extends ClusteredSingletonWatcher[T](stateClass) {
+  import ClusteredSupport._
+
+  private var _eid:String = _
+  /** the ephemeral id of the node is unique within in the group */
+  def eid = _eid
+  
+  private var _state:T = _
+
+  override def stop = {
+    this.synchronized {
+      if(_eid != null) {
+        leave
+      }
+      super.stop
+    }
+  }
+
+  def join(state:T):Unit = this.synchronized {
+    if(state==null)
+      throw new IllegalArgumentException("State cannot be null")
+    if(state.id==null)
+      throw new IllegalArgumentException("The state id cannot be null")
+    if(_group==null)
+      throw new IllegalStateException("Not started.")
+    if(this._state!=null)
+      throw new IllegalStateException("Already joined")
+    this._state = state
+    _eid = group.join(encode(state, mapper))
+  }
+
+  def leave:Unit = this.synchronized {
+    if(this._state==null)
+      throw new IllegalStateException("Not joined")
+    if(_group==null)
+      throw new IllegalStateException("Not started.")
+    _group.leave(_eid)
+    _eid = null
+    this._state = null.asInstanceOf[T]
+  }
+
+  def update(state:T) = this.synchronized {
+    if(this._state==null)
+      throw new IllegalStateException("Not joined")
+    if(state==null)
+      throw new IllegalArgumentException("State cannot be null")
+    if(state.id==null)
+      throw new IllegalArgumentException("The state id cannot be null")
+    if(state.id!=this._state.id)
+      throw new IllegalArgumentException("The state id cannot change")
+
+    if(_group==null)
+      throw new IllegalStateException("Not started.")
+    this._state = state
+    _group.update(_eid, encode(state, mapper))
+  }
+
+  def isMaster:Boolean = this.synchronized {
+    if(this._state==null)
+      return false;
+    _members.get(this._state.id) match {
+      case Some(nodes) =>
+        nodes.headOption.map { x=>
+          x._1 == _eid
+        }.getOrElse(false)
+      case None => false
+    }
+  }
+
+  def master = this.synchronized {
+    if(this._state==null)
+      throw new IllegalStateException("Not joined")
+    _members.get(this._state.id).map(_.head._2)
+  }
+
+  def slaves = this.synchronized {
+    if(this._state==null)
+      throw new IllegalStateException("Not joined")
+    val rc = _members.get(this._state.id).map(_.toList).getOrElse(List())
+    rc.drop(1).map(_._2)
+  }
+
+}

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala?rev=1501877&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/Group.scala Wed Jul 10 17:49:03 2013
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated.groups
+
+import internal.ZooKeeperGroup
+import org.apache.zookeeper.data.ACL
+import org.apache.zookeeper.ZooDefs.Ids
+import java.util.LinkedHashMap
+
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ZooKeeperGroupFactory {
+
+  def create(zk: ZKClient, path: String):Group = new ZooKeeperGroup(zk, path)
+  def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = ZooKeeperGroup.members(zk, path)
+}
+
+/**
+ * <p>
+ *   Used the join a cluster group and to monitor the memberships
+ *   of that group.
+ * </p>
+ * <p>
+ *   This object is not thread safe.  You should are responsible for
+ *   synchronizing access to it across threads.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Group {
+
+  /**
+   * Adds a member to the group with some associated data.
+   */
+  def join(data:Array[Byte]):String
+
+  /**
+   * Updates the data associated with joined member.
+   */
+  def update(id:String, data:Array[Byte]):Unit
+
+  /**
+   * Removes a previously added member.
+   */
+  def leave(id:String):Unit
+
+  /**
+   * Lists all the members currently in the group.
+   */
+  def members:java.util.LinkedHashMap[String, Array[Byte]]
+
+  /**
+   * Registers a change listener which will be called
+   * when the cluster membership changes.
+   */
+  def add(listener:ChangeListener)
+
+  /**
+   * Removes a previously added change listener.
+   */
+  def remove(listener:ChangeListener)
+
+  /**
+   * A group should be closed to release aquired resources used
+   * to monitor the group membership.
+   *
+   * Whe the Group is closed, any memberships registered via this
+   * Group will be removed from the group.
+   */
+  def close:Unit
+
+  /**
+   * Are we connected with the cluster?
+   */
+  def connected:Boolean
+}
+
+/**
+ * <p>
+ *   Callback interface used to get notifications of changes
+ *   to a cluster group.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ChangeListener {
+  def changed:Unit
+  def connected:Unit
+  def disconnected:Unit
+}
+

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala?rev=1501877&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ChangeListenerSupport.scala Wed Jul 10 17:49:03 2013
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated.groups.internal
+
+import org.apache.activemq.leveldb.replicated.groups.ChangeListener
+import org.slf4j.{Logger, LoggerFactory}
+import java.util.concurrent.TimeUnit
+
+
+object ChangeListenerSupport {
+    val LOG: Logger = LoggerFactory.getLogger(classOf[ChangeListenerSupport])
+}
+/**
+ * <p>
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait ChangeListenerSupport {
+
+  var listeners = List[ChangeListener]()
+
+  def connected:Boolean
+
+  def add(listener: ChangeListener): Unit = {
+    val connected = this.synchronized {
+      listeners ::= listener
+      this.connected
+    }
+    if (connected) {
+      listener.connected
+    }
+  }
+
+  def remove(listener: ChangeListener): Unit = this.synchronized {
+    listeners = listeners.filterNot(_ == listener)
+  }
+
+  def fireConnected() = {
+    val listener = this.synchronized { this.listeners }
+    check_elapsed_time {
+      for (listener <- listeners) {
+        listener.connected
+      }
+    }
+  }
+
+  def fireDisconnected() = {
+    val listener = this.synchronized { this.listeners }
+    check_elapsed_time {
+      for (listener <- listeners) {
+        listener.disconnected
+      }
+    }
+  }
+
+  def fireChanged() = {
+    val listener = this.synchronized { this.listeners }
+    val start = System.nanoTime()
+    check_elapsed_time {
+      for (listener <- listeners) {
+        listener.changed
+      }
+    }
+  }
+
+  def check_elapsed_time[T](func: => T):T = {
+    val start = System.nanoTime()
+    try {
+      func
+    } finally {
+      val end = System.nanoTime()
+      val elapsed = TimeUnit.NANOSECONDS.toMillis(end-start)
+      if( elapsed > 100 ) {
+        ChangeListenerSupport.LOG.warn("listeners are taking too long to process the events")
+      }
+    }
+  }
+  
+}
\ No newline at end of file

Added: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala?rev=1501877&view=auto
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala (added)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/replicated/groups/internal/ZooKeeperGroup.scala Wed Jul 10 17:49:03 2013
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.leveldb.replicated.groups.internal
+
+import org.apache.zookeeper._
+import java.lang.String
+import org.linkedin.zookeeper.tracker._
+import org.apache.activemq.leveldb.replicated.groups.{ZKClient, ChangeListener, Group}
+import scala.collection.mutable.HashMap
+import org.linkedin.zookeeper.client.LifecycleListener
+import collection.JavaConversions._
+import java.util.{LinkedHashMap, Collection}
+import org.apache.zookeeper.KeeperException.{ConnectionLossException, NoNodeException, Code}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+object ZooKeeperGroup {
+  def members(zk: ZKClient, path: String):LinkedHashMap[String, Array[Byte]] = {
+    var rc = new LinkedHashMap[String, Array[Byte]]
+    zk.getAllChildren(path).sortWith((a,b)=> a < b).foreach { node =>
+      try {
+        if( node.matches("""0\d+""") ) {
+          rc.put(node, zk.getData(path+"/"+node))
+        } else {
+          None
+        }
+      } catch {
+        case e:Throwable =>
+          e.printStackTrace
+      }
+    }
+    rc
+
+  }
+
+
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class ZooKeeperGroup(val zk: ZKClient, val root: String) extends Group with LifecycleListener with ChangeListenerSupport {
+
+  val tree = new ZooKeeperTreeTracker[Array[Byte]](zk, new ZKByteArrayDataReader, root, 1)
+  val joins = HashMap[String, Int]()
+
+  var members = new LinkedHashMap[String, Array[Byte]]
+
+  private def member_path_prefix = root + "/0"
+
+  zk.registerListener(this)
+
+  create(root)
+  tree.track(new NodeEventsListener[Array[Byte]]() {
+    def onEvents(events: Collection[NodeEvent[Array[Byte]]]): Unit = {
+      fire_cluster_change
+    }
+  })
+  fire_cluster_change
+
+
+  def close = this.synchronized {
+    joins.foreach { case (path, version) =>
+      try {
+        zk.delete(member_path_prefix + path, version)
+      } catch {
+        case x:NoNodeException => // Already deleted.
+      }
+    }
+    joins.clear
+    tree.destroy
+    zk.removeListener(this)
+  }
+
+  def connected = zk.isConnected
+  def onConnected() = fireConnected()
+  def onDisconnected() = fireDisconnected()
+
+  def join(data:Array[Byte]=null): String = this.synchronized {
+    val id = zk.createWithParents(member_path_prefix, data, CreateMode.EPHEMERAL_SEQUENTIAL).stripPrefix(member_path_prefix)
+    joins.put(id, 0)
+    id
+  }
+
+  def update(path:String, data:Array[Byte]=null): Unit = this.synchronized {
+    joins.get(path) match {
+      case Some(ver) =>
+        val stat = zk.setData(member_path_prefix+path, data, ver)
+        joins.put(path, stat.getVersion)
+      case None => throw new IllegalArgumentException("Has not joined locally: "+path)
+    }
+  }
+
+  def leave(path:String): Unit = this.synchronized {
+    joins.remove(path).foreach {
+      case version =>
+          try {
+            zk.delete(member_path_prefix + path, version)
+          } catch {
+            case x: NoNodeException => // Already deleted.
+            case x: ConnectionLossException => // disconnected
+          }
+    }
+  }
+
+  private def fire_cluster_change: Unit = {
+    this.synchronized {
+      val t = tree.getTree.toList.filterNot { x =>
+      // don't include the root node, or nodes that don't match our naming convention.
+        (x._1 == root) || !x._1.stripPrefix(root).matches("""/0\d+""")
+      }
+
+      this.members = new LinkedHashMap()
+      t.sortWith((a,b)=> a._1 < b._1 ).foreach { x=>
+        this.members.put(x._1.stripPrefix(member_path_prefix), x._2.getData)
+      }
+    }
+    fireChanged()
+  }
+
+  private def create(path: String, count : java.lang.Integer = 0): Unit = {
+    try {
+      if (zk.exists(path, false) != null) {
+        return
+      }
+      try {
+        // try create given path in persistent mode
+        zk.createOrSetWithParents(path, "", CreateMode.PERSISTENT)
+      } catch {
+        case ignore: KeeperException.NodeExistsException =>
+      }
+    } catch {
+      case ignore : KeeperException.SessionExpiredException => {
+        if (count > 20) {
+          // we tried enought number of times
+          throw new IllegalStateException("Cannot create path " + path, ignore)
+        }
+        // try to create path with increased counter value
+        create(path, count + 1)
+      }
+    }
+  }
+
+}
\ No newline at end of file

Modified: activemq/trunk/activemq-osgi/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-osgi/pom.xml?rev=1501877&r1=1501876&r2=1501877&view=diff
==============================================================================
--- activemq/trunk/activemq-osgi/pom.xml (original)
+++ activemq/trunk/activemq-osgi/pom.xml Wed Jul 10 17:49:03 2013
@@ -52,8 +52,6 @@
   	  org.codehaus.jettison*;resolution:=optional,
   	  org.jasypt*;resolution:=optional,
   	  org.eclipse.jetty*;resolution:=optional,
-  	  org.fusesource.fabric*;version="[7,8]";resolution:=optional,
-  	  org.fusesource.fabric.groups*;version="[7,8]";resolution:=optional,
       org.apache.zookeeper;resolution:=optional,
       org.linkedin*;resolution:=optional,
   	  org.springframework.jms*;version="[3,4]";resolution:=optional,

Modified: activemq/trunk/assembly/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/pom.xml?rev=1501877&r1=1501876&r2=1501877&view=diff
==============================================================================
--- activemq/trunk/assembly/pom.xml (original)
+++ activemq/trunk/assembly/pom.xml Wed Jul 10 17:49:03 2013
@@ -82,20 +82,21 @@
       <version>${hawtdispatch-version}</version>
     </dependency>
     <dependency>
-      <groupId>org.fusesource.fabric</groupId>
-      <artifactId>fabric-groups</artifactId>
-      <version>${fabric-version}</version>
+      <groupId>org.linkedin</groupId>
+      <artifactId>org.linkedin.zookeeper-impl</artifactId>
+      <version>${linkedin-zookeeper-version}</version>
     </dependency>
     <dependency>
-      <groupId>org.fusesource.fabric</groupId>
-      <artifactId>fabric-linkedin-zookeeper</artifactId>
-      <version>${fabric-version}</version>
+      <groupId>org.linkedin</groupId>
+      <artifactId>org.linkedin.util-core</artifactId>
+      <version>${linkedin-zookeeper-version}</version>
     </dependency>
     <dependency>
-      <groupId>org.fusesource.fabric</groupId>
-      <artifactId>fabric-zookeeper</artifactId>
-      <version>${fabric-version}</version>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper-version}</version>
     </dependency>
+    
     <dependency>
       <groupId>org.osgi</groupId>
       <artifactId>org.osgi.core</artifactId>

Modified: activemq/trunk/assembly/src/main/descriptors/common-bin.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/main/descriptors/common-bin.xml?rev=1501877&r1=1501876&r2=1501877&view=diff
==============================================================================
--- activemq/trunk/assembly/src/main/descriptors/common-bin.xml (original)
+++ activemq/trunk/assembly/src/main/descriptors/common-bin.xml Wed Jul 10 17:49:03 2013
@@ -223,8 +223,10 @@
         <include>org.xerial.snappy:*</include>
         <include>org.iq80.snappy:*</include>
         <include>org.codehaus.jackson:*</include>
-        <include>org.fusesource.fabric:*</include>
-
+        <include>org.linkedin:org.linkedin.zookeeper-impl</include>
+        <include>org.linkedin:org.linkedin.util-core</include>
+        <include>org.apache.zookeeper:zookeeper</include>
+        
       </includes>
     </dependencySet>
     <dependencySet>

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1501877&r1=1501876&r2=1501877&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Wed Jul 10 17:49:03 2013
@@ -94,8 +94,9 @@
     <opensymphony-version>2.4.2</opensymphony-version>
     <org-apache-derby-version>10.9.1.0</org-apache-derby-version>
     <org.osgi.core-version>4.3.1</org.osgi.core-version>
-    <fabric-version>7.2.0.redhat-024</fabric-version>
     <p2psockets-version>1.1.2</p2psockets-version>
+    <linkedin-zookeeper-version>1.4.0</linkedin-zookeeper-version>
+    <zookeeper-version>3.4.3</zookeeper-version>
     <qpid-proton-version>0.3.0-fuse-4</qpid-proton-version>
     <qpid-jms-version>0.22</qpid-jms-version>
     <regexp-version>1.3</regexp-version>