You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ja...@apache.org on 2010/01/20 13:57:17 UTC
svn commit: r901172 - in /incubator/cassandra/trunk/contrib/mutex: ./ lib/
src/ src/org/ src/org/apache/ src/org/apache/cassandra/
src/org/apache/cassandra/mutex/
Author: jaakko
Date: Wed Jan 20 12:57:16 2010
New Revision: 901172
URL: http://svn.apache.org/viewvc?rev=901172&view=rev
Log:
Add cluster-wide mutex. Patch by jaakko, reviewed by jbellis. CASSANDRA-704
Added:
incubator/cassandra/trunk/contrib/mutex/
incubator/cassandra/trunk/contrib/mutex/README
incubator/cassandra/trunk/contrib/mutex/cassandra.patch
incubator/cassandra/trunk/contrib/mutex/lib/
incubator/cassandra/trunk/contrib/mutex/lib/zookeeper-3.2.2.jar (with props)
incubator/cassandra/trunk/contrib/mutex/src/
incubator/cassandra/trunk/contrib/mutex/src/org/
incubator/cassandra/trunk/contrib/mutex/src/org/apache/
incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/
incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/
incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java
incubator/cassandra/trunk/contrib/mutex/storage-conf.xml
Added: incubator/cassandra/trunk/contrib/mutex/README
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/README?rev=901172&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/mutex/README (added)
+++ incubator/cassandra/trunk/contrib/mutex/README Wed Jan 20 12:57:16 2010
@@ -0,0 +1,9 @@
+Provides cluster-wide mutex. In order to use this, you need to
+
+(1) apply cassandra.patch to support relevant configuration options
+
+(2) add zookeeper configuration (storage-conf.xml) to your cassandra
+configuration file
+
+(3) copy org/apache/cassandra/mutex/ClusterMutex.java to cassandra src
+directory
Added: incubator/cassandra/trunk/contrib/mutex/cassandra.patch
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/cassandra.patch?rev=901172&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/mutex/cassandra.patch (added)
+++ incubator/cassandra/trunk/contrib/mutex/cassandra.patch Wed Jan 20 12:57:16 2010
@@ -0,0 +1,50 @@
+diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+index 426544b..28a53f6 100644
+--- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
++++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
+@@ -70,6 +70,9 @@ public class DatabaseDescriptor
+ private static int replicationFactor_ = 3;
+ private static long rpcTimeoutInMillis_ = 2000;
+ private static Set<InetAddress> seeds_ = new HashSet<InetAddress>();
++ private static String zooKeeperRoot_ = "locks";
++ private static String zooKeeperPort_ = "2181";
++ private static Set<String> zooKeepers_ = new HashSet<String>();
+ /* Keeps the list of data file directories */
+ private static String[] dataFileDirectories_;
+ /* Current index into the above list of directories */
+@@ -579,6 +582,13 @@ public class DatabaseDescriptor
+ {
+ seeds_.add(InetAddress.getByName(seeds[i]));
+ }
++
++ /* zookeepers */
++ zooKeeperRoot_ = xmlUtils.getNodeValue("/Storage/ZooKeeperRoot");
++ zooKeeperPort_ = xmlUtils.getNodeValue("/Storage/ZooKeeperPort");
++ String[] zooKeepers = xmlUtils.getNodeValues("/Storage/ZooKeepers/ZooKeeper");
++ for (int i=0; i<zooKeepers.length; ++i)
++ zooKeepers_.add(zooKeepers[i]);
+ }
+ catch (ConfigurationException e)
+ {
+@@ -891,6 +901,21 @@ public class DatabaseDescriptor
+ return seeds_;
+ }
+
++ public static String getZooKeeperRoot()
++ {
++ return zooKeeperRoot_;
++ }
++
++ public static String getZooKeeperPort()
++ {
++ return zooKeeperPort_;
++ }
++
++ public static Set<String> getZooKeepers()
++ {
++ return zooKeepers_;
++ }
++
+ public static String getColumnFamilyType(String tableName, String cfName)
+ {
+ assert tableName != null;
Added: incubator/cassandra/trunk/contrib/mutex/lib/zookeeper-3.2.2.jar
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/lib/zookeeper-3.2.2.jar?rev=901172&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/cassandra/trunk/contrib/mutex/lib/zookeeper-3.2.2.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Added: incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java?rev=901172&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java (added)
+++ incubator/cassandra/trunk/contrib/mutex/src/org/apache/cassandra/mutex/ClusterMutex.java Wed Jan 20 12:57:16 2010
@@ -0,0 +1,342 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.mutex;
+
+import java.util.List;
+import java.io.IOException;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+
+import org.apache.log4j.Logger;
+
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+public class ClusterMutex implements Watcher
+{
+ private static Logger logger = Logger.getLogger(ClusterMutex.class);
+
+ private static ClusterMutex instance;
+
+ // Lazy on purpose. People who do not want mutex, should not need to have to worry about ZK
+ private static class LazyHolder
+ {
+ private static final ClusterMutex clusterMutex = new ClusterMutex();
+ }
+
+ public static ClusterMutex instance()
+ {
+ return LazyHolder.clusterMutex;
+ }
+
+ // this must include hyphen (-) as the last character. substring search relies on it
+ private final String LockPrefix = "lock-";
+
+ // if we're disconnected from ZooKeeper server, how many times shall we try the lock
+ // operation before giving up
+ private final int OperationRetries = 3;
+
+ // how long to sleep between retries. Actual time slept is RetryInterval multiplied by how
+ // many times have we already tried.
+ private final long RetryInterval = 500L;
+
+ // Session timeout to ZooKeeper
+ private final int SessionTimeout = 3000;
+
+ private long lastConnect = 0;
+
+ private ZooKeeper zk = null;
+ private String root = null;
+ private Integer mutex = null;
+
+ private String hostString = new String();
+
+ private ClusterMutex()
+ {
+ String zooKeeperRoot = DatabaseDescriptor.getZooKeeperRoot();
+ this.root = "/" + ((zooKeeperRoot != null) ? zooKeeperRoot : "");
+ mutex = new Integer(1);
+
+ String zooKeeperPort = DatabaseDescriptor.getZooKeeperPort();
+
+ for (String zooKeeper : DatabaseDescriptor.getZooKeepers())
+ {
+ logger.warn(zooKeeper);
+ hostString += (hostString.isEmpty()) ? "" : ",";
+ hostString += zooKeeper + ":" + zooKeeperPort;
+ logger.warn(hostString);
+ }
+
+ try
+ {
+ connectZooKeeper();
+ if (zk.exists(root, false) == null)
+ {
+ logger.info("Mutex root " + root + " does not exists, creating");
+ zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("ClusterMutex initialization failed: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Connect to zookeeper server
+ */
+ private synchronized void connectZooKeeper() throws IOException
+ {
+ if (zk != null && zk.getState() != ZooKeeper.States.CLOSED)
+ return;
+ logger.info("Connecting to ZooKeepers: " + hostString);
+ zk = new ZooKeeper(hostString, SessionTimeout, this);
+ }
+
+ /**
+ * close current session and try to connect to zookeeper server
+ */
+ private synchronized void reestablishZooKeeperSession() throws IOException
+ {
+ long now = System.currentTimeMillis();
+
+ // let's not flood zookeeper with connection requests
+ if ((now - lastConnect) < SessionTimeout)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Only " + (now - lastConnect) + "ms passed since last reconnect, not trying again yet");
+ return;
+ }
+
+ lastConnect = now;
+
+ try
+ {
+ zk.close();
+ }
+ catch (Exception e)
+ {
+ // ignore all exceptions. we're calling this just to make sure ephemeral nodes are
+ // deleted. zk might be in an inconsistent state and cause exception.
+ }
+
+ connectZooKeeper();
+ }
+
+ /**
+ * process any events from ZooKeeper. We simply wake up any clients that are waiting for
+ * file deletion. Number of clients is usually very small (most likely just one), so no need
+ * for any complex logic.
+ */
+ public void process(WatchedEvent event)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("Got event " + event.getType() + ", keeper state " + event.getState() + ", path " + event.getPath());
+
+ synchronized (mutex)
+ {
+ mutex.notifyAll();
+ }
+ }
+
+ private boolean isConnected()
+ {
+ return zk.getState() == ZooKeeper.States.CONNECTED;
+ }
+
+ /**
+ * lock
+ *
+ * @param lockName lock to be locked for writing. name can be any string, but it must not
+ * include slash (/) or any character disallowed by ZooKeeper (see
+ * hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkDataModel).
+ * @return name of the znode inside zookeeper holding this lock.
+ */
+ public String lock(String lockName) throws KeeperException, InterruptedException, IOException
+ {
+ for (int i=1; i<=OperationRetries; i++)
+ {
+ try
+ {
+ return lockInternal(lockName);
+ }
+ catch (KeeperException.SessionExpiredException e)
+ {
+ logger.warn("ZooKeeper session expired, reconnecting");
+ reestablishZooKeeperSession();
+ }
+ catch (KeeperException.ConnectionLossException e)
+ {
+ // ZooKeeper handles lost connection automatically, but in order to reset all
+ // ephemeral nodes, we close the whole thing.
+ logger.warn("ZooKeeper connection lost, reconnecting");
+ reestablishZooKeeperSession();
+ }
+
+ try
+ {
+ Thread.sleep(RetryInterval * i);
+ }
+ catch (InterruptedException ignore)
+ {
+ // Just fall through to retry
+ }
+ }
+
+ throw new KeeperException.ConnectionLossException();
+ }
+
+ /**
+ * creates lock znode in zookeeper under lockPath. Lock name is
+ * LockPrefix plus ephemeral sequence number given by zookeeper
+ *
+ * @param lockPath name of the lock (directory in zookeeper)
+ */
+ private String createLockZNode(String lockPath) throws KeeperException, InterruptedException
+ {
+ String lockZNode = null;
+
+ try
+ {
+ lockZNode = zk.create(lockPath + "/" + LockPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+ catch (NoNodeException e)
+ {
+ logger.info(lockPath + " does not exist, creating");
+ zk.create(lockPath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ lockZNode = zk.create(lockPath + "/" + LockPrefix, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+ }
+
+ return lockZNode;
+ }
+
+ /**
+ * lockInteral does the actual locking.
+ *
+ * @param same as in lock
+ */
+ private String lockInternal(String lockName) throws KeeperException, InterruptedException
+ {
+ String lockZNode = null;
+ String lockPath = root + "/" + lockName;
+
+ lockZNode = createLockZNode(lockPath);
+
+ if (logger.isTraceEnabled())
+ logger.trace("lockZNode created " + lockZNode);
+
+ while (true)
+ {
+ // check what is our ID (sequence number at the end of file name added by ZK)
+ int mySeqNum = Integer.parseInt(lockZNode.substring(lockZNode.lastIndexOf('-') + 1));
+ int previousSeqNum = -1;
+ String predessor = null;
+
+ // get all children of lock znode and find the one that is just before us, if
+ // any. This must be inside loop, as children might get deleted out of order because
+ // of client disconnects. We cannot assume that the file that is in front of us this
+ // time, is there next time. It might have been deleted even though earlier files
+ // are still there.
+ List<String> children = zk.getChildren(lockPath, false);
+ if (children.isEmpty())
+ {
+ logger.warn("No children in " + lockPath + " although one was just created. Going to try again");
+ lockZNode = createLockZNode(lockPath);
+ continue;
+ }
+ for (String child : children)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("child: " + child);
+ int otherSeqNum = Integer.parseInt(child.substring(child.lastIndexOf('-') + 1));
+ if (otherSeqNum < mySeqNum && otherSeqNum > previousSeqNum)
+ {
+ previousSeqNum = otherSeqNum;
+ predessor = child;
+ }
+ }
+
+ // our sequence number is smallest, we have the lock
+ if (previousSeqNum == -1)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace("No smaller znode sequences, " + lockZNode + " acquired lock");
+ return lockZNode;
+ }
+
+ // there is at least one znode before us. wait for it to be deleted.
+ synchronized (mutex)
+ {
+ if (zk.exists(lockPath + "/" + predessor, true) == null)
+ {
+ if (logger.isTraceEnabled())
+ logger.trace(predessor + " does not exists, " + lockZNode + " acquired lock");
+ break;
+ }
+ else if (logger.isTraceEnabled())
+ logger.trace(predessor + " is still here, " + lockZNode + " must wait");
+
+ mutex.wait();
+
+ if (isConnected() == false)
+ {
+ logger.info("ZooKeeper disconnected while waiting for lock");
+ throw new KeeperException.ConnectionLossException();
+ }
+ }
+ }
+
+ return lockZNode;
+ }
+
+ /**
+ * unlock
+ *
+ * @param lockZNode this MUST be the string returned by lock call. Otherwise there will be
+ * chaos.
+ */
+ public void unlock(String lockZNode)
+ {
+ assert (lockZNode != null);
+
+ if (logger.isTraceEnabled())
+ logger.trace("deleting " + lockZNode);
+
+ try
+ {
+ zk.delete(lockZNode, -1);
+ }
+ catch (Exception e)
+ {
+ // We do not do anything here. The idea is to check that everything goes OK when
+ // locking and let unlock always succeed from client's point of view. Ephemeral
+ // nodes should be taken care of by ZooKeeper, so ignoring any errors here should
+ // not break anything.
+ }
+ }
+
+}
Added: incubator/cassandra/trunk/contrib/mutex/storage-conf.xml
URL: http://svn.apache.org/viewvc/incubator/cassandra/trunk/contrib/mutex/storage-conf.xml?rev=901172&view=auto
==============================================================================
--- incubator/cassandra/trunk/contrib/mutex/storage-conf.xml (added)
+++ incubator/cassandra/trunk/contrib/mutex/storage-conf.xml Wed Jan 20 12:57:16 2010
@@ -0,0 +1,25 @@
+<Storage>
+
+ <!-- ZooKeeper options -->
+
+ <!--
+ ~ For cluster-wide lock ZooKeeper is needed. List ZooKeeper server
+ ~ addresses here.
+ -->
+ <ZooKeepers>
+ <ZooKeeper>127.0.0.1</ZooKeeper>
+ </ZooKeepers>
+
+ <!--
+ ~ What port ZooKeeper is listening
+ -->
+ <ZooKeeperPort>2181</ZooKeeperPort>
+
+ <!--
+ ~ What path to use for locking inside zookeeper. This must not
+ ~ include slash (/) or any characters disallowed by ZooKeeper (see
+ ~ hadoop.apache.org/zookeeper/docs/current/zookeeperProgrammers.html#ch_zkDataModel)
+ -->
+ <ZooKeeperRoot>locks</ZooKeeperRoot>
+
+</Storage>