You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2010/01/20 13:57:17 UTC

svn commit: r901172 - in /incubator/cassandra/trunk/contrib/mutex: ./ lib/ src/ src/org/ src/org/apache/ src/org/apache/cassandra/ src/org/apache/cassandra/mutex/

Author: jaakko
Date: Wed Jan 20 12:57:16 2010
New Revision: 901172

URL: http://svn.apache.org/viewvc?rev=901172&view=rev
Log:
Add cluster-wide mutex. Patch by jaakko, reviewed by jbellis. CASSANDRA-704

Added:
    incubator/cassandra/trunk/contrib/mutex/
    incubator/cassandra/trunk/contrib/mutex/README
    incubator/cassandra/trunk/contrib/mutex/cassandra.patch
    incubator/cassandra/trunk/contrib/mutex/lib/
    incubator/cassandra/trunk/contrib/mutex/lib/zookeeper-3.2.2.jar   (with props)
    incubator/cassandra/trunk/contrib/mutex/src/
    incubator/cassandra/trunk/contrib/mutex/src/org/
    incubator/cassandra/trunk/contrib/mutex/src/org/apache/
    incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/
    incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/
    incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java
    incubator/cassandra/trunk/contrib/mutex/storage-conf.xml

Added: incubator/cassandra/trunk/contrib/mutex/README
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/README?rev=901172&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/mutex/README (added)
+++ incubator/cassandra/trunk/contrib/mutex/README Wed Jan 20 12:57:16 2010
@@ -0,0 +1,9 @@
+Provides cluster-wide mutex. In order to use this, you need to
+
+(1) apply cassandra.patch to support relevant configuration options
+
+(2) add zookeeper configuration (storage-conf.xml) to your cassandra
+configuration file
+
+(3) copy org/apache/cassandra/mutex/ClusterMutex.java to cassandra src
+directory

Added: incubator/cassandra/trunk/contrib/mutex/cassandra.patch
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/cassandra.patch?rev=901172&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/mutex/cassandra.patch (added)
+++ incubator/cassandra/trunk/contrib/mutex/cassandra.patch Wed Jan 20 12:57:16 2010
@@ -0,0 +1,50 @@
+diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+index 426544b..28a53f6 100644
+--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
++++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+@@ -70,6 +70,9 @@ public class DatabaseDescriptor
+     private static int replicationFactor_ = 3;
+     private static long rpcTimeoutInMillis_ = 2000;
+     private static Set<InetAddress> seeds_ = new HashSet<InetAddress>();
++    private static String zooKeeperRoot_ = "locks";
++    private static String zooKeeperPort_ = "2181";
++    private static Set<String> zooKeepers_ = new HashSet<String>();
+     /* Keeps the list of data file directories */
+     private static String[] dataFileDirectories_;
+     /* Current index into the above list of directories */
+@@ -579,6 +582,13 @@ public class DatabaseDescriptor
+             {
+                 seeds_.add(InetAddress.getByName(seeds[i]));
+             }
++
++            /* zookeepers */
++            zooKeeperRoot_ = xmlUtils.getNodeValue("/Storage/ZooKeeperRoot");
++            zooKeeperPort_ = xmlUtils.getNodeValue("/Storage/ZooKeeperPort");
++            String[] zooKeepers = xmlUtils.getNodeValues("/Storage/ZooKeepers/ZooKeeper");
++            for (int i=0; i<zooKeepers.length; ++i)
++                zooKeepers_.add(zooKeepers[i]);
+         }
+         catch (ConfigurationException e)
+         {
+@@ -891,6 +901,21 @@ public class DatabaseDescriptor
+         return seeds_;
+     }
+ 
++    public static String getZooKeeperRoot()
++    {
++        return zooKeeperRoot_;
++    }
++
++    public static String getZooKeeperPort()
++    {
++        return zooKeeperPort_;
++    }
++
++    public static Set<String> getZooKeepers()
++    {
++        return zooKeepers_;
++    }
++
+     public static String getColumnFamilyType(String tableName, String cfName)
+     {
+         assert tableName != null;

Added: incubator/cassandra/trunk/contrib/mutex/lib/zookeeper-3.2.2.jar
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/lib/zookeeper-3.2.2.jar?rev=901172&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/cassandra/trunk/contrib/mutex/lib/zookeeper-3.2.2.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java?rev=901172&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java (added)
+++ incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java Wed Jan 20 12:57:16 2010
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.mutex;
+
+import java.util.List;
+import java.io.IOException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class ClusterMutex implements Watcher
+{
+    private static Logger logger = Logger.getLogger(ClusterMutex.class);
+
+    private static ClusterMutex instance;
+
+    // Lazy on purpose. People who do not want mutex, should not need to have to worry about ZK
+    private static class LazyHolder
+    {
+        private static final ClusterMutex clusterMutex = new ClusterMutex();
+    }
+
+    public static ClusterMutex instance()
+    {
+        return LazyHolder.clusterMutex;
+    }
+
+    // this must include hyphen (-) as the last character. substring search relies on it
+    private final String LockPrefix = "lock-";
+
+    // if we're disconnected from ZooKeeper server, how many times shall we try the lock
+    // operation before giving up
+    private final int OperationRetries = 3;
+
+    // how long to sleep between retries. Actual time slept is RetryInterval multiplied by how
+    // many times have we already tried.
+    private final long RetryInterval = 500L;
+
+    // Session timeout to ZooKeeper
+    private final int SessionTimeout = 3000;
+
+    private long lastConnect = 0;
+
+    private ZooKeeper zk = null;
+    private String root = null;
+    private Integer mutex = null;
+
+    private String hostString = new String();
+
+    private ClusterMutex()
+    {
+        String zooKeeperRoot = DatabaseDescriptor.getZooKeeperRoot();
+        this.root = "/" + ((zooKeeperRoot != null) ? zooKeeperRoot : "");
+        mutex = new Integer(1);
+
+        String zooKeeperPort = DatabaseDescriptor.getZooKeeperPort();
+
+        for (String zooKeeper : DatabaseDescriptor.getZooKeepers())
+        {
+            logger.warn(zooKeeper);
+            hostString += (hostString.isEmpty()) ? "" : ",";
+            hostString += zooKeeper + ":" + zooKeeperPort;
+            logger.warn(hostString);
+        }
+
+        try
+        {
+            connectZooKeeper();
+            if (zk.exists(root, false) == null)
+            {
+                logger.info("Mutex root " + root + " does not exists, creating");
+                zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+        }
+        catch (Exception e)
+        {
+            throw new RuntimeException("ClusterMutex initialization failed: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Connect to zookeeper server
+     */
+    private synchronized void connectZooKeeper() throws IOException
+    {
+        if (zk != null && zk.getState() != ZooKeeper.States.CLOSED)
+            return;
+        logger.info("Connecting to ZooKeepers: " + hostString);
+        zk = new ZooKeeper(hostString, SessionTimeout, this);
+    }
+
+    /**
+     * close current session and try to connect to zookeeper server
+     */
+    private synchronized void reestablishZooKeeperSession() throws IOException
+    {
+        long now = System.currentTimeMillis();
+
+        // let's not flood zookeeper with connection requests
+        if ((now - lastConnect) < SessionTimeout)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Only " + (now - lastConnect) + "ms passed since last reconnect, not trying again yet");
+            return;
+        }
+
+        lastConnect = now;
+
+        try
+        {
+            zk.close();
+        }
+        catch (Exception e)
+        {
+            // ignore all exceptions. we're calling this just to make sure ephemeral nodes are
+            // deleted. zk might be in an inconsistent state and cause exception.
+        }
+
+        connectZooKeeper();
+    }
+
+    /**
+     * process any events from ZooKeeper. We simply wake up any clients that are waiting for
+     * file deletion. Number of clients is usually very small (most likely just one), so no need
+     * for any complex logic.
+     */
+    public void process(WatchedEvent event)
+    {
+        if (logger.isTraceEnabled())
+            logger.trace("Got event " + event.getType() + ", keeper state " + event.getState() + ", path " + event.getPath());
+
+        synchronized (mutex)
+        {
+            mutex.notifyAll();
+        }
+    }
+
+    private boolean isConnected()
+    {
+        return zk.getState() == ZooKeeper.States.CONNECTED;
+    }
+
+    /**
+     * lock
+     *
+     * @param lockName lock to be locked for writing. name can be any string, but it must not
+     * include slash (/) or any character disallowed by ZooKeeper (see
+     * hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkDataModel).
+     * @return name of the znode inside zookeeper holding this lock.
+     */
+    public String lock(String lockName) throws KeeperException, InterruptedException, IOException
+    {
+        for (int i=1; i<=OperationRetries; i++)
+        {
+            try
+            {
+                return lockInternal(lockName);
+            }
+            catch (KeeperException.SessionExpiredException e)
+            {
+                logger.warn("ZooKeeper session expired, reconnecting");
+                reestablishZooKeeperSession();
+            }
+            catch (KeeperException.ConnectionLossException e)
+            {
+                // ZooKeeper handles lost connection automatically, but in order to reset all
+                // ephemeral nodes, we close the whole thing.
+                logger.warn("ZooKeeper connection lost, reconnecting");
+                reestablishZooKeeperSession();
+            }
+
+            try
+            {
+                Thread.sleep(RetryInterval * i);
+            }
+            catch (InterruptedException ignore)
+            {
+                // Just fall through to retry
+            }
+        }
+
+        throw new KeeperException.ConnectionLossException();
+    }
+
+    /**
+     * creates lock znode in zookeeper under lockPath. Lock name is
+     * LockPrefix plus ephemeral sequence number given by zookeeper
+     * 
+     * @param lockPath name of the lock (directory in zookeeper)
+     */
+    private String createLockZNode(String lockPath) throws KeeperException, InterruptedException
+    {
+	String lockZNode = null;
+
+        try
+        {
+            lockZNode = zk.create(lockPath + "/" + LockPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+        }
+        catch (NoNodeException e)
+        {
+            logger.info(lockPath + " does not exist, creating");
+            zk.create(lockPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            lockZNode = zk.create(lockPath + "/" + LockPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+        }
+
+	return lockZNode;
+    }
+
+    /**
+     * lockInteral does the actual locking.
+     *
+     * @param same as in lock
+     */
+    private String lockInternal(String lockName) throws KeeperException, InterruptedException
+    {
+        String lockZNode = null;
+        String lockPath = root + "/" + lockName;
+
+	lockZNode = createLockZNode(lockPath);
+
+        if (logger.isTraceEnabled())
+            logger.trace("lockZNode created " + lockZNode);
+
+        while (true)
+        {
+	    // check what is our ID (sequence number at the end of file name added by ZK)
+	    int mySeqNum = Integer.parseInt(lockZNode.substring(lockZNode.lastIndexOf('-') + 1));
+	    int previousSeqNum = -1;
+	    String predessor = null;
+
+            // get all children of lock znode and find the one that is just before us, if
+            // any. This must be inside loop, as children might get deleted out of order because
+            // of client disconnects. We cannot assume that the file that is in front of us this
+            // time, is there next time. It might have been deleted even though earlier files
+            // are still there.
+            List<String> children = zk.getChildren(lockPath, false);
+	    if (children.isEmpty())
+	    {
+		logger.warn("No children in " + lockPath + " although one was just created. Going to try again");
+		lockZNode = createLockZNode(lockPath);
+		continue;
+	    }
+            for (String child : children)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("child: " + child);
+                int otherSeqNum = Integer.parseInt(child.substring(child.lastIndexOf('-') + 1));
+                if (otherSeqNum < mySeqNum && otherSeqNum > previousSeqNum)
+                {
+                    previousSeqNum = otherSeqNum;
+                    predessor = child;
+                }
+            }
+
+            // our sequence number is smallest, we have the lock
+            if (previousSeqNum == -1)
+            {
+                if (logger.isTraceEnabled())
+                    logger.trace("No smaller znode sequences, " + lockZNode + " acquired lock");
+                return lockZNode;
+            }
+
+            // there is at least one znode before us. wait for it to be deleted.
+            synchronized (mutex)
+            {
+                if (zk.exists(lockPath + "/" + predessor, true) == null)
+                {
+                    if (logger.isTraceEnabled())
+                        logger.trace(predessor + " does not exists, " + lockZNode + " acquired lock");
+                    break;
+                }
+                else if (logger.isTraceEnabled())
+                    logger.trace(predessor + " is still here, " + lockZNode + " must wait");
+
+                mutex.wait();
+
+                if (isConnected() == false)
+                {
+                    logger.info("ZooKeeper disconnected while waiting for lock");
+                    throw new KeeperException.ConnectionLossException();
+                }
+            }
+        }
+
+        return lockZNode;
+    }
+
+    /**
+     * unlock
+     *
+     * @param lockZNode this MUST be the string returned by lock call. Otherwise there will be
+     * chaos.
+     */
+    public void unlock(String lockZNode)
+    {
+        assert (lockZNode != null);
+
+        if (logger.isTraceEnabled())
+            logger.trace("deleting " + lockZNode);
+
+        try
+        {
+            zk.delete(lockZNode, -1);
+        }
+        catch (Exception e)
+        {
+            // We do not do anything here. The idea is to check that everything goes OK when
+            // locking and let unlock always succeed from client's point of view. Ephemeral
+            // nodes should be taken care of by ZooKeeper, so ignoring any errors here should
+            // not break anything.
+        }
+    }
+
+}

Added: incubator/cassandra/trunk/contrib/mutex/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/storage-conf.xml?rev=901172&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/mutex/storage-conf.xml (added)
+++ incubator/cassandra/trunk/contrib/mutex/storage-conf.xml Wed Jan 20 12:57:16 2010
@@ -0,0 +1,25 @@
+<Storage>
+
+  <!-- ZooKeeper options -->
+
+  <!--
+   ~ For cluster-wide lock ZooKeeper is needed. List ZooKeeper server
+   ~ addresses here.
+  -->
+  <ZooKeepers>
+    <ZooKeeper>127.0.0.1</ZooKeeper>
+  </ZooKeepers>
+
+  <!--
+   ~ What port ZooKeeper is listening
+  -->
+  <ZooKeeperPort>2181</ZooKeeperPort>
+
+  <!-- 
+   ~ What path to use for locking inside zookeeper. This must not
+   ~ include slash (/) or any characters disallowed by ZooKeeper (see
+   ~ hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkDataModel)
+   -->
+  <ZooKeeperRoot>locks</ZooKeeperRoot>
+
+</Storage>