You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/07/01 00:53:29 UTC

svn commit: r1141746 [2/3] - in /zookeeper/trunk: ./ src/ src/c/ src/c/include/ src/c/src/ src/c/tests/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/main/org/a...

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java?rev=1141746&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/Op.java Thu Jun 30 22:53:28 2011
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.jute.Record;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.proto.CheckVersionRequest;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * Represents a single operation in a multi-operation transaction.  Each operation can be a create, update
+ * or delete or can just be a version check.
+ *
+ * Sub-classes of Op each represent each detailed type but should not normally be referenced except via
+ * the provided factory methods.
+ *
+ * @see ZooKeeper#create(String, byte[], java.util.List, CreateMode)
+ * @see ZooKeeper#create(String, byte[], java.util.List, CreateMode, org.apache.zookeeper.AsyncCallback.StringCallback, Object)
+ * @see ZooKeeper#delete(String, int)
+ * @see ZooKeeper#setData(String, byte[], int)
+ */
+public abstract class Op {
+    private int type;
+    private String path;
+
+    // prevent untyped construction
+    private Op(int type, String path) {
+        this.type = type;
+        this.path = path;
+    }
+
+    /**
+     * Constructs a create operation.  Arguments are as for the ZooKeeper method of the same name.
+     * @see ZooKeeper#create(String, byte[], java.util.List, CreateMode)
+     * @see CreateMode#fromFlag(int)
+     *
+     * @param path
+     *                the path for the node
+     * @param data
+     *                the initial data for the node
+     * @param acl
+     *                the acl for the node
+     * @param flags
+     *                specifying whether the node to be created is ephemeral
+     *                and/or sequential but using the integer encoding.
+     */
+    public static Op create(String path, byte[] data, List<ACL> acl, int flags) {
+        return new Create(path, data, acl, flags);
+    }
+
+    /**
+     * Constructs a create operation.  Arguments are as for the ZooKeeper method of the same name.
+     * @see ZooKeeper#create(String, byte[], java.util.List, CreateMode)
+     *
+     * @param path
+     *                the path for the node
+     * @param data
+     *                the initial data for the node
+     * @param acl
+     *                the acl for the node
+     * @param createMode
+     *                specifying whether the node to be created is ephemeral
+     *                and/or sequential
+     */
+    public static Op create(String path, byte[] data, List<ACL> acl, CreateMode createMode) {
+        return new Create(path, data, acl, createMode);
+    }
+
+    /**
+     * Constructs a delete operation.  Arguments are as for the ZooKeeper method of the same name.
+     * @see ZooKeeper#delete(String, int)
+     *
+     * @param path
+     *                the path of the node to be deleted.
+     * @param version
+     *                the expected node version.
+     */
+    public static Op delete(String path, int version) {
+        return new Delete(path, version);
+    }
+
+    /**
+     * Constructs an update operation.  Arguments are as for the ZooKeeper method of the same name.
+     * @see ZooKeeper#setData(String, byte[], int)
+     *
+     * @param path
+     *                the path of the node
+     * @param data
+     *                the data to set
+     * @param version
+     *                the expected matching version
+     */
+    public static Op setData(String path, byte[] data, int version) {
+        return new SetData(path, data, version);
+    }
+
+
+    /**
+     * Constructs an version check operation.  Arguments are as for the ZooKeeper.setData method except that
+     * no data is provided since no update is intended.  The purpose for this is to allow read-modify-write
+     * operations that apply to multiple znodes, but where some of the znodes are involved only in the read,
+     * not the write.  A similar effect could be achieved by writing the same data back, but that leads to
+     * way more version updates than are necessary and more writing in general.
+     *
+     * @param path
+     *                the path of the node
+     * @param version
+     *                the expected matching version
+     * @return
+     */
+    public static Op check(String path, int version) {
+        return new Check(path, version);
+    }
+
+    /**
+     * Gets the integer type code for an Op.  This code should be as from ZooDefs.OpCode
+     * @see ZooDefs.OpCode
+     * @return  The type code.
+     */
+    public int getType() {
+        return type;
+    }
+
+    /**
+     * Gets the path for an Op.
+     * @return  The path.
+     */
+    public String getPath() {
+        return path;
+    }
+
+    /**
+     * Encodes an op for wire transmission.
+     * @return An appropriate Record structure.
+     */
+    public abstract Record toRequestRecord() ;
+
+    //////////////////
+    // these internal classes are public, but should not generally be referenced.
+    //
+    public static class Create extends Op {
+        private byte[] data;
+        private List<ACL> acl;
+        private int flags;
+
+        private Create(String path, byte[] data, List<ACL> acl, int flags) {
+            super(ZooDefs.OpCode.create, path);
+            this.data = data;
+            this.acl = acl;
+            this.flags = flags;
+        }
+
+        private Create(String path, byte[] data, List<ACL> acl, CreateMode createMode) {
+            super(ZooDefs.OpCode.create, path);
+            this.data = data;
+            this.acl = acl;
+            this.flags = createMode.toFlag();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Create)) return false;
+
+            Create op = (Create) o;
+
+            boolean aclEquals = true;
+            Iterator<ACL> i = op.acl.iterator();
+            for (ACL acl : op.acl) {
+                boolean hasMoreData = i.hasNext();
+                if (!hasMoreData) {
+                    aclEquals = false;
+                    break;
+                }
+                ACL otherAcl = i.next();
+                if (!acl.equals(otherAcl)) {
+                    aclEquals = false;
+                    break;
+                }
+            }
+            return !i.hasNext() && getType() == op.getType() && Arrays.equals(data, op.data) && flags == op.flags && aclEquals;
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() + getPath().hashCode() + Arrays.hashCode(data);
+        }
+
+        @Override
+        public Record toRequestRecord() {
+            return new CreateRequest(getPath(), data, acl, flags);
+        }
+    }
+
+    public static class Delete extends Op {
+        private int version;
+
+        private Delete(String path, int version) {
+            super(ZooDefs.OpCode.delete, path);
+            this.version = version;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Delete)) return false;
+
+            Delete op = (Delete) o;
+
+            return getType() == op.getType() && version == op.version 
+                   && getPath().equals(op.getPath());
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() + getPath().hashCode() + version;
+        }
+
+        @Override
+        public Record toRequestRecord() {
+            return new DeleteRequest(getPath(), version);
+        }
+    }
+
+    public static class SetData extends Op {
+        private byte[] data;
+        private int version;
+
+        private SetData(String path, byte[] data, int version) {
+            super(ZooDefs.OpCode.setData, path);
+            this.data = data;
+            this.version = version;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof SetData)) return false;
+
+            SetData op = (SetData) o;
+
+            return getType() == op.getType() && version == op.version 
+                   && getPath().equals(op.getPath()) && Arrays.equals(data, op.data);
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() + getPath().hashCode() + Arrays.hashCode(data) + version;
+        }
+
+        @Override
+        public Record toRequestRecord() {
+            return new SetDataRequest(getPath(), data, version);
+        }
+    }
+
+    public static class Check extends Op {
+        private int version;
+
+        private Check(String path, int version) {
+            super(ZooDefs.OpCode.check, path);
+            this.version = version;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Check)) return false;
+
+            Check op = (Check) o;
+
+            return getType() == op.getType() && getPath().equals(op.getPath()) && version == op.version;
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() + getPath().hashCode() + version;
+        }
+
+        @Override
+        public Record toRequestRecord() {
+            return new CheckVersionRequest(getPath(), version);
+        }
+    }
+}

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/OpResult.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/OpResult.java?rev=1141746&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/OpResult.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/OpResult.java Thu Jun 30 22:53:28 2011
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+
+import org.apache.zookeeper.data.Stat;
+
+/**
+ * Encodes the result of a single part of a multiple operation commit.
+ */
+public class OpResult {
+    private int type;
+
+    private OpResult(int type) {
+        this.type = type;
+    }
+
+    /**
+     * Encodes the return type as from ZooDefs.OpCode.  Can be used
+     * to dispatch to the correct cast needed for getting the desired
+     * additional result data.
+     * @see ZooDefs.OpCode
+     * @return an integer identifying what kind of operation this result came from.
+     */
+    public int getType() {
+        return type;
+    }
+
+    /**
+     * A result from a create operation.  This kind of result allows the
+     * path to be retrieved since the create might have been a sequential
+     * create.
+     */
+    public static class CreateResult extends OpResult {
+        private String path;
+
+        public CreateResult(String path) {
+            super(ZooDefs.OpCode.create);
+            this.path = path;
+        }
+
+        public String getPath() {
+            return path;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            CreateResult other = (CreateResult) o;
+            return getType() == other.getType() && path.equals(other.path);
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() * 35 + path.hashCode();
+        }
+    }
+
+    /**
+     * A result from a delete operation.  No special values are available.
+     */
+    public static class DeleteResult extends OpResult {
+        public DeleteResult() {
+            super(ZooDefs.OpCode.delete);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            OpResult opResult = (OpResult) o;
+            return getType() == opResult.getType();
+        }
+
+        @Override
+        public int hashCode() {
+            return getType();
+        }
+    }
+
+    /**
+     * A result from a setData operation.  This kind of result provides access
+     * to the Stat structure from the update.
+     */
+    public static class SetDataResult extends OpResult {
+        private Stat stat;
+
+        public SetDataResult(Stat stat) {
+            super(ZooDefs.OpCode.setData);
+            this.stat = stat;
+        }
+
+        public Stat getStat() {
+            return stat;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            SetDataResult other = (SetDataResult) o;
+            return getType() == other.getType() && stat.getMzxid() == other.stat.getMzxid();
+        }
+
+        @Override
+        public int hashCode() {
+            return (int) (getType() * 35 + stat.getMzxid());
+        }
+    }
+
+    /**
+     * A result from a version check operation.  No special values are available.
+     */
+    public static class CheckResult extends OpResult {
+        public CheckResult() {
+            super(ZooDefs.OpCode.check);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            CheckResult other = (CheckResult) o;
+            return getType() == other.getType();
+        }
+
+        @Override
+        public int hashCode() {
+            return getType();
+        }
+    }
+
+    /**
+     * An error result from any kind of operation.  The point of error results
+     * is that they contain an error code which helps understand what happened.
+     * @see KeeperException.Code
+     *
+     */
+    public static class ErrorResult extends OpResult {
+        private int err;
+
+        public ErrorResult(int err) {
+            super(ZooDefs.OpCode.error);
+            this.err = err;
+        }
+
+        public int getErr() {
+            return err;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof OpResult)) return false;
+
+            ErrorResult other = (ErrorResult) o;
+            return getType() == other.getType() && err == other.getErr();
+        }
+
+        @Override
+        public int hashCode() {
+            return getType() * 35 + err;
+        }
+    }
+}

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/Transaction.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/Transaction.java?rev=1141746&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/Transaction.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/Transaction.java Thu Jun 30 22:53:28 2011
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+/**
+ * Provides a builder style interface for doing multiple updates.  This is
+ * really just a thin layer on top of Zookeeper.multi().
+ */
+public class Transaction {
+    private ZooKeeper zk;
+    private MultiTransactionRecord request = new MultiTransactionRecord();
+
+    protected Transaction(ZooKeeper zk) {
+        this.zk = zk;
+    }
+
+    public Transaction create(final String path, byte data[], List<ACL> acl,
+                              CreateMode createMode) {
+        request.add(Op.create(path, data, acl, createMode.toFlag()));
+        return this;
+    }
+
+    public Transaction delete(final String path, int version) {
+        request.add(Op.delete(path, version));
+        return this;
+    }
+
+    public Transaction check(String path, int version) {
+        request.add(Op.check(path, version));
+        return this;
+    }
+
+    public Transaction setData(final String path, byte data[], int version) {
+        request.add(Op.setData(path, data, version));
+        return this;
+    }
+
+    public List<OpResult> commit() throws InterruptedException, KeeperException {
+        return zk.multiInternal(request);
+    }
+}

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java Thu Jun 30 22:53:28 2011
@@ -50,6 +50,10 @@ public class ZooDefs {
 
         public final int getChildren2 = 12;
 
+        public final int check = 13;
+
+        public final int multi = 14;
+
         public final int auth = 100;
 
         public final int setWatches = 101;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Thu Jun 30 22:53:28 2011
@@ -18,53 +18,22 @@
 
 package org.apache.zookeeper;
 
-import java.io.IOException;
-import java.net.SocketAddress;
-import java.util.ArrayList;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.zookeeper.AsyncCallback.ACLCallback;
-import org.apache.zookeeper.AsyncCallback.Children2Callback;
-import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
-import org.apache.zookeeper.AsyncCallback.DataCallback;
-import org.apache.zookeeper.AsyncCallback.StatCallback;
-import org.apache.zookeeper.AsyncCallback.StringCallback;
-import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.AsyncCallback.*;
+import org.apache.zookeeper.OpResult.ErrorResult;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.StaticHostProvider;
 import org.apache.zookeeper.common.PathUtils;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.proto.CreateResponse;
-import org.apache.zookeeper.proto.DeleteRequest;
-import org.apache.zookeeper.proto.ExistsRequest;
-import org.apache.zookeeper.proto.GetACLRequest;
-import org.apache.zookeeper.proto.GetACLResponse;
-import org.apache.zookeeper.proto.GetChildren2Request;
-import org.apache.zookeeper.proto.GetChildren2Response;
-import org.apache.zookeeper.proto.GetChildrenRequest;
-import org.apache.zookeeper.proto.GetChildrenResponse;
-import org.apache.zookeeper.proto.GetDataRequest;
-import org.apache.zookeeper.proto.GetDataResponse;
-import org.apache.zookeeper.proto.ReplyHeader;
-import org.apache.zookeeper.proto.RequestHeader;
-import org.apache.zookeeper.proto.SetACLRequest;
-import org.apache.zookeeper.proto.SetACLResponse;
-import org.apache.zookeeper.proto.SetDataRequest;
-import org.apache.zookeeper.proto.SetDataResponse;
-import org.apache.zookeeper.proto.SyncRequest;
-import org.apache.zookeeper.proto.SyncResponse;
+import org.apache.zookeeper.proto.*;
 import org.apache.zookeeper.server.DataTree;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.util.*;
 
 /**
  * This is the main class of ZooKeeper client library. To use a ZooKeeper
@@ -885,7 +854,61 @@ public class ZooKeeper {
         }
     }
 
-    
+    /**
+     * Executes multiple Zookeeper operations or none of them.  On success, a list of results is returned.
+     * On failure, only a single exception is returned.  If you want more details, it may be preferable to
+     * use the alternative form of this method that lets you pass a list into which individual results are
+     * placed so that you can zero in on exactly which operation failed and why.
+     * <p>
+     * The maximum allowable size of all of the data arrays in all of the setData operations in this single
+     * request is 1 MB (1,048,576 bytes).
+     * Requests larger than this will cause a KeeperExecption to be thrown.
+     * @param ops  An iterable that contains the operations to be done.  These should be created using the
+     * factory methods on Op.
+     * @see Op
+     * @return A list of results.
+     * @throws InterruptedException  If the operation was interrupted.  The operation may or may not have succeeded, but
+     * will not have partially succeeded if this exception is thrown.
+     * @throws KeeperException If the operation could not be completed due to some error in doing one of the specified
+     * ops.
+     */
+    public List<OpResult> multi(Iterable<Op> ops) throws InterruptedException, KeeperException {
+        return multiInternal(new MultiTransactionRecord(ops));
+    }
+
+    protected List<OpResult> multiInternal(MultiTransactionRecord request)
+        throws InterruptedException, KeeperException {
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.multi);
+        MultiResponse response = new MultiResponse();
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
+        if (r.getErr() != 0) {
+            throw KeeperException.create(KeeperException.Code.get(r.getErr()));
+        }
+
+        List<OpResult> results = response.getResultList();
+        
+        ErrorResult fatalError = null;
+        for (OpResult result : results) {
+            if (result instanceof ErrorResult && ((ErrorResult)result).getErr() != KeeperException.Code.OK.intValue()) {
+                fatalError = (ErrorResult) result;
+                break;
+            }
+        }
+
+        if (fatalError != null) {
+            KeeperException ex = KeeperException.create(KeeperException.Code.get(fatalError.getErr()));
+            ex.setMultiResults(results);
+            throw ex;
+        }
+
+        return results;
+    }
+
+    public Transaction transaction() {
+        return new Transaction(this);
+    }
+
     /**
      * Recursively delete the node with the given path. 
      * <p>
@@ -1247,7 +1270,7 @@ public class ZooKeeper {
      * thrown if the given version does not match the node's version.
      * <p>
      * The maximum allowable size of the data array is 1 MB (1,048,576 bytes).
-     * Arrays larger than this will cause a KeeperExecption to be thrown.
+     * Arrays larger than this will cause a KeeperException to be thrown.
      *
      * @param path
      *                the path of the node

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Thu Jun 30 22:53:28 2011
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,6 +30,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import java.nio.ByteBuffer;
+
 import org.apache.jute.Index;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
@@ -56,8 +59,16 @@ import org.apache.zookeeper.txn.DeleteTx
 import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.CheckVersionTxn;
+import org.apache.zookeeper.txn.Txn;
+import org.apache.zookeeper.txn.MultiTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetACLRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+
 /**
  * This class maintains the tree data structure. It doesn't have any networking
  * or client connection code in it so that it can be tested in a stand alone
@@ -736,6 +747,8 @@ public class DataTree {
 
         public Stat stat;
 
+        public List<ProcessTxnResult> multiResult;
+        
         /**
          * Equality is defined as the clientId and the cxid being the same. This
          * allows us to use hash tables to track completion of transactions.
@@ -766,7 +779,8 @@ public class DataTree {
 
     public volatile long lastProcessedZxid = 0;
 
-    public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
+    public ProcessTxnResult processTxn(TxnHeader header, Record txn)
+    {
         ProcessTxnResult rc = new ProcessTxnResult();
 
         String debug = "";
@@ -776,6 +790,7 @@ public class DataTree {
             rc.zxid = header.getZxid();
             rc.type = header.getType();
             rc.err = 0;
+            rc.multiResult = null;
             if (rc.zxid > lastProcessedZxid) {
                 lastProcessedZxid = rc.zxid;
             }
@@ -800,15 +815,16 @@ public class DataTree {
                     break;
                 case OpCode.setData:
                     SetDataTxn setDataTxn = (SetDataTxn) txn;
-                    debug = "Set data for  transaction for "
-                            + setDataTxn.getPath();
+                    debug = "Set data transaction for "
+                            + setDataTxn.getPath()
+                            + " to new value=" + Arrays.toString(setDataTxn.getData());
                     rc.stat = setData(setDataTxn.getPath(), setDataTxn
                             .getData(), setDataTxn.getVersion(), header
                             .getZxid(), header.getTime());
                     break;
                 case OpCode.setACL:
                     SetACLTxn setACLTxn = (SetACLTxn) txn;
-                    debug = "Set ACL for  transaction for "
+                    debug = "Set ACL transaction for "
                             + setACLTxn.getPath();
                     rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
                             setACLTxn.getVersion());
@@ -820,10 +836,83 @@ public class DataTree {
                     ErrorTxn errTxn = (ErrorTxn) txn;
                     rc.err = errTxn.getErr();
                     break;
+                case OpCode.check:
+                    CheckVersionTxn checkTxn = (CheckVersionTxn) txn;
+                    debug = "Check Version transaction for "
+                            + checkTxn.getPath() 
+                            + " and version="
+                            + checkTxn.getVersion();
+                    rc.path = checkTxn.getPath();
+                    break;
+                case OpCode.multi:
+                    MultiTxn multiTxn = (MultiTxn) txn ;
+                    List<Txn> txns = multiTxn.getTxns();
+                    debug = "Multi transaction with " + txns.size() + " operations";
+                    rc.multiResult = new ArrayList<ProcessTxnResult>();
+                    boolean failed = false;
+                    for (Txn subtxn : txns) {
+                        if (subtxn.getType() == OpCode.error) {
+                            failed = true;
+                            break;
+                        }
+                    }
+
+                    boolean post_failed = false;
+                    for (Txn subtxn : txns) {
+                        ByteBuffer bb = ByteBuffer.wrap(subtxn.getData());
+                        Record record = null;
+                        switch (subtxn.getType()) {
+                            case OpCode.create:
+                                record = new CreateTxn();
+                                break;
+                            case OpCode.delete:
+                                record = new DeleteTxn();
+                                break;
+                            case OpCode.setData:
+                                record = new SetDataTxn();
+                                break;
+                            case OpCode.error:
+                                record = new ErrorTxn();
+                                post_failed = true;
+                                break;
+                            case OpCode.check:
+                                record = new CheckVersionTxn();
+                                break;
+                            default:
+                                throw new IOException("Invalid type of op: " + subtxn.getType());
+                        }
+                        assert(record != null);
+
+                        ZooKeeperServer.byteBuffer2Record(bb, record);
+                       
+                        if (failed && subtxn.getType() != OpCode.error){
+                            int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue() 
+                                                 : Code.OK.intValue();
+
+                            subtxn.setType(OpCode.error);
+                            record = new ErrorTxn(ec);
+                        }
+
+                        if (failed) {
+                            assert(subtxn.getType() == OpCode.error) ;
+                        }
+
+                        TxnHeader subHdr = new TxnHeader(header.getClientId(), header.getCxid(),
+                                                         header.getZxid(), header.getTime(), 
+                                                         subtxn.getType());
+                        ProcessTxnResult subRc = processTxn(subHdr, record);
+                        rc.multiResult.add(subRc);
+                        if (subRc.err != 0 && rc.err == 0) {
+                            rc.err = subRc.err ;
+                        }
+                    }
+                    break;
             }
         } catch (KeeperException e) {
-             LOG.debug("Failed: " + debug, e);
+             LOG.warn("Failed: " + debug, e);
              rc.err = e.code().intValue();
+        } catch (IOException e) {
+            LOG.warn("Failed:" + debug, e);
         }
         return rc;
     }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Thu Jun 30 22:53:28 2011
@@ -26,6 +26,7 @@ import org.apache.jute.Record;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MultiResponse;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.SessionMovedException;
@@ -54,6 +55,15 @@ import org.apache.zookeeper.server.ZooKe
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
 
+import org.apache.zookeeper.MultiTransactionRecord;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.OpResult.CheckResult;
+import org.apache.zookeeper.OpResult.CreateResult;
+import org.apache.zookeeper.OpResult.DeleteResult;
+import org.apache.zookeeper.OpResult.SetDataResult;
+import org.apache.zookeeper.OpResult.ErrorResult;
+
 /**
  * This Request processor actually applies any transaction associated with a
  * request and services any queries. It is always at the end of a
@@ -151,7 +161,7 @@ public class FinalRequestProcessor imple
             }
 
             KeeperException ke = request.getException();
-            if (ke != null) {
+            if (ke != null && request.type != OpCode.multi) {
                 throw ke;
             }
 
@@ -180,6 +190,39 @@ public class FinalRequestProcessor imple
                 zks.finishSessionInit(request.cnxn, true);
                 return;
             }
+            case OpCode.multi: {
+                lastOp = "MULT";
+                rsp = new MultiResponse() ;
+
+                for (ProcessTxnResult subTxnResult : rc.multiResult) {
+
+                    OpResult subResult ;
+
+                    switch (subTxnResult.type) {
+                        case OpCode.check:
+                            subResult = new CheckResult();
+                            break;
+                        case OpCode.create:
+                            subResult = new CreateResult(subTxnResult.path);
+                            break;
+                        case OpCode.delete:
+                            subResult = new DeleteResult();
+                            break;
+                        case OpCode.setData:
+                            subResult = new SetDataResult(subTxnResult.stat);
+                            break;
+                        case OpCode.error:
+                            subResult = new ErrorResult(subTxnResult.err) ;
+                            break;
+                        default:
+                            throw new IOException("Invalid type of op");
+                    }
+
+                    ((MultiResponse)rsp).add(subResult);
+                }
+
+                break;
+            }
             case OpCode.create: {
                 lastOp = "CREA";
                 rsp = new CreateResponse(rc.path);
@@ -217,6 +260,12 @@ public class FinalRequestProcessor imple
                 rsp = new SyncResponse(syncRequest.getPath());
                 break;
             }
+            case OpCode.check: {
+                lastOp = "CHEC";
+                rsp = new SetDataResponse(rc.stat);
+                err = Code.get(rc.err);
+                break;
+            }
             case OpCode.exists: {
                 lastOp = "EXIS";
                 // TODO we need to figure out the security requirement for this!

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Thu Jun 30 22:53:28 2011
@@ -18,19 +18,27 @@
 
 package org.apache.zookeeper.server;
 
+import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.Record;
+import org.apache.jute.BinaryOutputArchive;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MultiTransactionRecord;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.ZooDefs.OpCode;
@@ -42,6 +50,7 @@ import org.apache.zookeeper.proto.Create
 import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.SetACLRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.proto.CheckVersionRequest;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.server.auth.AuthenticationProvider;
 import org.apache.zookeeper.server.auth.ProviderRegistry;
@@ -51,6 +60,9 @@ import org.apache.zookeeper.txn.DeleteTx
 import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.CheckVersionTxn;
+import org.apache.zookeeper.txn.Txn;
+import org.apache.zookeeper.txn.MultiTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 
 /**
@@ -161,6 +173,78 @@ public class PrepRequestProcessor extend
         }
     }
 
+    /**
+     * Grab current pending change records for each op in a multi-op.
+     * 
+     * This is used inside MultiOp error code path to rollback in the event
+     * of a failed multi-op.
+     *
+     * @param multiRequest
+     */
+    HashMap<String, ChangeRecord> getPendingChanges(MultiTransactionRecord multiRequest) {
+    	HashMap<String, ChangeRecord> pendingChangeRecords = new HashMap<String, ChangeRecord>();
+    	
+        for(Op op: multiRequest) {
+    		String path = op.getPath();
+
+    		try {
+    			ChangeRecord cr = getRecordForPath(path);
+    			if (cr != null) {
+    				pendingChangeRecords.put(path, cr);
+    			}
+    		} catch (KeeperException.NoNodeException e) {
+    			// ignore this one
+    		}
+    	}
+        
+        return pendingChangeRecords;
+    }
+
+    /**
+     * Rollback pending changes records from a failed multi-op.
+     *
+     * If a multi-op fails, we can't leave any invalid change records we created
+     * around. We also need to restore their prior value (if any) if their prior
+     * value is still valid.
+     *
+     * @param zxid
+     * @param pendingChangeRecords
+     */
+    void rollbackPendingChanges(long zxid, HashMap<String, ChangeRecord>pendingChangeRecords) {
+
+        synchronized (zks.outstandingChanges) {
+            // Grab a list iterator starting at the END of the list so we can iterate in reverse
+            ListIterator<ChangeRecord> iter = zks.outstandingChanges.listIterator(zks.outstandingChanges.size());
+            while (iter.hasPrevious()) {
+                ChangeRecord c = iter.previous();
+                if (c.zxid == zxid) {
+                    iter.remove();
+                    zks.outstandingChangesForPath.remove(c.path);
+                } else {
+                    break;
+                }
+            }
+           
+            boolean empty = zks.outstandingChanges.isEmpty();
+            long firstZxid = 0;
+            if (!empty) {
+                firstZxid = zks.outstandingChanges.get(0).zxid;
+            }
+
+            Iterator<ChangeRecord> priorIter = pendingChangeRecords.values().iterator();
+            while (priorIter.hasNext()) {
+                ChangeRecord c = priorIter.next();
+                 
+                /* Don't apply any prior change records less than firstZxid */
+                if (!empty && (c.zxid < firstZxid)) {
+                    continue;
+                }
+
+                zks.outstandingChangesForPath.put(c.path, c);
+            }
+        }
+    }
+
     static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
             List<Id> ids) throws KeeperException.NoAuthException {
         if (skipACL) {
@@ -200,23 +284,20 @@ public class PrepRequestProcessor extend
      * This method will be called inside the ProcessRequestThread, which is a
      * singleton, so there will be a single thread calling this code.
      *
+     * @param type
+     * @param zxid
      * @param request
+     * @param record
      */
     @SuppressWarnings("unchecked")
-    protected void pRequest(Request request) {
-        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
-        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
-        TxnHeader txnHeader = null;
-        Record txn = null;
-        try {
-            switch (request.type) {
+    protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException {
+        request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
+                                    zks.getTime(), type);
+
+        switch (type) {
             case OpCode.create:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.create);
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                CreateRequest createRequest = new CreateRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
-                        createRequest);
+                CreateRequest createRequest = (CreateRequest)record;
                 String path = createRequest.getPath();
                 int lastSlash = path.lastIndexOf('/');
                 if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
@@ -257,28 +338,24 @@ public class PrepRequestProcessor extend
                     throw new KeeperException.NoChildrenForEphemeralsException(path);
                 }
                 int newCversion = parentRecord.stat.getCversion()+1;
-                txn = new CreateTxn(path, createRequest.getData(),
+                request.txn = new CreateTxn(path, createRequest.getData(),
                         createRequest.getAcl(),
                         createMode.isEphemeral(), newCversion);
                 StatPersisted s = new StatPersisted();
                 if (createMode.isEphemeral()) {
                     s.setEphemeralOwner(request.sessionId);
                 }
-                parentRecord = parentRecord.duplicate(txnHeader.getZxid());
+                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
                 parentRecord.childCount++;
                 parentRecord.stat.setCversion(newCversion);
                 addChangeRecord(parentRecord);
-                addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path, s,
+                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
                         0, createRequest.getAcl()));
 
                 break;
             case OpCode.delete:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.delete);
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                DeleteRequest deleteRequest = new DeleteRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
-                        deleteRequest);
+                DeleteRequest deleteRequest = (DeleteRequest)record;
                 path = deleteRequest.getPath();
                 lastSlash = path.lastIndexOf('/');
                 if (lastSlash == -1 || path.indexOf('\0') != -1
@@ -297,20 +374,16 @@ public class PrepRequestProcessor extend
                 if (nodeRecord.childCount > 0) {
                     throw new KeeperException.NotEmptyException(path);
                 }
-                txn = new DeleteTxn(path);
-                parentRecord = parentRecord.duplicate(txnHeader.getZxid());
+                request.txn = new DeleteTxn(path);
+                parentRecord = parentRecord.duplicate(request.hdr.getZxid());
                 parentRecord.childCount--;
                 addChangeRecord(parentRecord);
-                addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path,
+                addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path,
                         null, -1, null));
                 break;
             case OpCode.setData:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.setData);
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                SetDataRequest setDataRequest = new SetDataRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
-                        setDataRequest);
+                SetDataRequest setDataRequest = (SetDataRequest)record;
                 path = setDataRequest.getPath();
                 nodeRecord = getRecordForPath(path);
                 checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
@@ -321,18 +394,14 @@ public class PrepRequestProcessor extend
                     throw new KeeperException.BadVersionException(path);
                 }
                 version = currentVersion + 1;
-                txn = new SetDataTxn(path, setDataRequest.getData(), version);
-                nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
+                request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
+                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                 nodeRecord.stat.setVersion(version);
                 addChangeRecord(nodeRecord);
                 break;
             case OpCode.setACL:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.setACL);
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
-                SetACLRequest setAclRequest = new SetACLRequest();
-                ZooKeeperServer.byteBuffer2Record(request.request,
-                        setAclRequest);
+                SetACLRequest setAclRequest = (SetACLRequest)record;
                 path = setAclRequest.getPath();
                 if (!fixupACL(request.authInfo, setAclRequest.getAcl())) {
                     throw new KeeperException.InvalidACLException(path);
@@ -346,24 +415,20 @@ public class PrepRequestProcessor extend
                     throw new KeeperException.BadVersionException(path);
                 }
                 version = currentVersion + 1;
-                txn = new SetACLTxn(path, setAclRequest.getAcl(), version);
-                nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
+                request.txn = new SetACLTxn(path, setAclRequest.getAcl(), version);
+                nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
                 nodeRecord.stat.setAversion(version);
                 addChangeRecord(nodeRecord);
                 break;
             case OpCode.createSession:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.createSession);
                 request.request.rewind();
                 int to = request.request.getInt();
-                txn = new CreateSessionTxn(to);
+                request.txn = new CreateSessionTxn(to);
                 request.request.rewind();
                 zks.sessionTracker.addSession(request.sessionId, to);
                 zks.setOwner(request.sessionId, request.getOwner());
                 break;
             case OpCode.closeSession:
-                txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
-                        .getNextZxid(), zks.getTime(), OpCode.closeSession);
                 // We don't want to do this check since the session expiration thread
                 // queues up this operation without being the session owner.
                 // this request is the last of the session so it should be ok
@@ -380,13 +445,142 @@ public class PrepRequestProcessor extend
                         }
                     }
                     for (String path2Delete : es) {
-                        addChangeRecord(new ChangeRecord(txnHeader.getZxid(),
+                        addChangeRecord(new ChangeRecord(request.hdr.getZxid(),
                                 path2Delete, null, 0, null));
                     }
                 }
                 LOG.info("Processed session termination for sessionid: 0x"
                         + Long.toHexString(request.sessionId));
                 break;
+            case OpCode.check:
+                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
+                CheckVersionRequest checkVersionRequest = (CheckVersionRequest)record;
+                path = checkVersionRequest.getPath();
+                nodeRecord = getRecordForPath(path);
+                checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ,
+                        request.authInfo);
+                version = checkVersionRequest.getVersion();
+                currentVersion = nodeRecord.stat.getVersion();
+                if (version != -1 && version != currentVersion) {
+                    throw new KeeperException.BadVersionException(path);
+                }
+                version = currentVersion + 1;
+                request.txn = new CheckVersionTxn(path, version);
+                break;
+        }
+    }
+
+    /**
+     * This method will be called inside the ProcessRequestThread, which is a
+     * singleton, so there will be a single thread calling this code.
+     *
+     * @param request
+     */
+    @SuppressWarnings("unchecked")
+    protected void pRequest(Request request) {
+        // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
+        // request.type + " id = 0x" + Long.toHexString(request.sessionId));
+        request.hdr = null;
+        request.txn = null;
+        
+        try {
+            switch (request.type) {
+                case OpCode.create:
+                CreateRequest createRequest = new CreateRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, createRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest);
+                break;
+            case OpCode.delete:
+                DeleteRequest deleteRequest = new DeleteRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, deleteRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
+                break;
+            case OpCode.setData:
+                SetDataRequest setDataRequest = new SetDataRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, setDataRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
+                break;
+            case OpCode.setACL:
+                SetACLRequest setAclRequest = new SetACLRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, setAclRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
+                break;
+            case OpCode.check:
+                CheckVersionRequest checkRequest = new CheckVersionRequest();
+                ZooKeeperServer.byteBuffer2Record(request.request, checkRequest);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
+                break;
+            case OpCode.multi:
+                MultiTransactionRecord multiRequest = new MultiTransactionRecord();
+                ZooKeeperServer.byteBuffer2Record(request.request, multiRequest);
+                List<Txn> txns = new ArrayList<Txn>();
+
+                //Each op in a multi-op must have the same zxid!
+                long zxid = zks.getNextZxid();
+                KeeperException ke = null;
+
+                //Store off current pending change records in case we need to rollback
+                HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
+
+                int index = 0;
+                for(Op op: multiRequest) {
+                    Record subrequest = op.toRequestRecord() ;
+
+                    /* If we've already failed one of the ops, don't bother
+                     * trying the rest as we know it's going to fail and it
+                     * would be confusing in the logfiles.
+                     */
+                    if (ke != null) {
+                        request.hdr.setType(OpCode.error);
+                        request.txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
+                    } 
+                    
+                    /* Prep the request and convert to a Txn */
+                    else {
+                        try {
+                            pRequest2Txn(op.getType(), zxid, request, subrequest);
+                        } catch (KeeperException e) {
+                            if (ke == null) {
+                                ke = e;
+                            }
+                            request.hdr.setType(OpCode.error);
+                            request.txn = new ErrorTxn(e.code().intValue());
+                            LOG.error(">>>> Got user-level KeeperException when processing "
+                                    + request.toString()
+                                    + " Error Path:" + e.getPath()
+                                    + " Error:" + e.getMessage());
+                            LOG.error(">>>> ABORTING remaing MultiOp ops");
+                            request.setException(e);
+
+                            /* Rollback change records from failed multi-op */
+                            rollbackPendingChanges(zxid, pendingChanges);
+                        }
+                    }
+
+                    //FIXME: I don't want to have to serialize it here and then
+                    //       immediately deserialize in next processor. But I'm 
+                    //       not sure how else to get the txn stored into our list.
+                    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                    request.txn.serialize(boa, "request") ;
+                    ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+                    txns.add(new Txn(request.hdr.getType(), bb.array()));
+                    index++;
+                }
+
+                request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), request.type);
+                request.txn = new MultiTxn(txns);
+                
+                break;
+
+            //create/close session don't require request record
+            case OpCode.createSession:
+            case OpCode.closeSession:
+                pRequest2Txn(request.type, zks.getNextZxid(), request, null);
+                break;
+ 
+            //All the rest don't need to create a Txn - just verify session
             case OpCode.sync:
             case OpCode.exists:
             case OpCode.getData:
@@ -400,9 +594,9 @@ public class PrepRequestProcessor extend
                 break;
             }
         } catch (KeeperException e) {
-            if (txnHeader != null) {
-                txnHeader.setType(OpCode.error);
-                txn = new ErrorTxn(e.code().intValue());
+            if (request.hdr != null) {
+                request.hdr.setType(OpCode.error);
+                request.txn = new ErrorTxn(e.code().intValue());
             }
             LOG.info("Got user-level KeeperException when processing "
                     + request.toString()
@@ -426,13 +620,11 @@ public class PrepRequestProcessor extend
             }
 
             LOG.error("Dumping request buffer: 0x" + sb.toString());
-            if (txnHeader != null) {
-                txnHeader.setType(OpCode.error);
-                txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
+            if (request.hdr != null) {
+                request.hdr.setType(OpCode.error);
+                request.txn = new ErrorTxn(Code.MARSHALLINGERROR.intValue());
             }
         }
-        request.hdr = txnHeader;
-        request.txn = txn;
         request.zxid = zks.getZxid();
         nextProcessor.processRequest(request);
     }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Thu Jun 30 22:53:28 2011
@@ -106,6 +106,8 @@ public class Request {
         case OpCode.createSession:
         case OpCode.exists:
         case OpCode.getData:
+        case OpCode.check:
+        case OpCode.multi:
         case OpCode.setData:
         case OpCode.sync:
         case OpCode.getACL:
@@ -136,6 +138,8 @@ public class Request {
         case OpCode.delete:
         case OpCode.setACL:
         case OpCode.setData:
+        case OpCode.check:
+        case OpCode.multi:
             return true;
         default:
             return false;
@@ -156,6 +160,10 @@ public class Request {
             return "exists";
         case OpCode.getData:
             return "getData";
+        case OpCode.check:
+            return "check";
+        case OpCode.multi:
+            return "multi";
         case OpCode.setData:
             return "setData";
         case OpCode.sync:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RequestProcessor.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RequestProcessor.java Thu Jun 30 22:53:28 2011
@@ -21,7 +21,7 @@ package org.apache.zookeeper.server;
 /**
  * RequestProcessors are chained together to process transactions. Requests are
  * always processed in order. The standalone server, follower, and leader all
- * have slightly different RequestProcessors changed together.
+ * have slightly different RequestProcessors chained together.
  * 
  * Requests always move forward through the chain of RequestProcessors. Requests
  * are passed to a RequestProcessor through processRequest(). Generally method

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java Thu Jun 30 22:53:28 2011
@@ -43,6 +43,8 @@ public class TraceFormatter {
             return "getDate";
         case OpCode.setData:
             return "setData";
+        case OpCode.multi:
+            return "multi";
         case OpCode.getACL:
             return "getACL";
         case OpCode.setACL:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/package.html
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/package.html?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/package.html (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/package.html Thu Jun 30 22:53:28 2011
@@ -30,12 +30,12 @@ ZooKeeper maintains a order when process
 </ul>
 <p>
 We will explain the three aspects of ZooKeeperServer: request processing, data
-structure maintence, and session tracking.
+structure maintenance, and session tracking.
 
 <h2>Request processing</h2>
 
 Requests are received by the ServerCnxn. Demarshalling of a request is
-done by ClientRequestHandler. After a request has be Demarshalled,
+done by ClientRequestHandler. After a request has been demarshalled,
 ClientRequestHandler invokes the relevant method in ZooKeeper and marshals
 the result.
 <p>
@@ -51,10 +51,10 @@ txnQueue of SyncThread via queueItem. Wh
 disk, its callback will be invoked which will cause the request processing to
 be completed.
 
-<h2>Data structure maintence</h2>
+<h2>Data structure maintenance</h2>
 
 ZooKeeper data is stored in-memory. Each znode is stored in a DataNode object.
-This object is accessed through a hashtable that maps paths to DataNodes.
+This object is accessed through a hash table that maps paths to DataNodes.
 DataNodes also organize themselves into a tree. This tree is only used for
 serializing nodes.
 <p>
@@ -66,7 +66,7 @@ at any point, we need to be careful of p
 <p>
 We address the above problems by
 <ul>
-<li>Preallocating 1M chunks of file space. This allows us to append to the
+<li>Pre-allocating 1M chunks of file space. This allows us to append to the
 file without causing seeks to update file size. It also means that we need
 to check for the end of the log by looking for a zero length transaction
 rather than simply end of file.
@@ -84,7 +84,7 @@ and l as the sequence of transactions th
 the time the snapshot begins and the time the snapshot completes, we write
 to disk T+l' where l' is a subset of the transactions in l. While we do not
 have a way of figuring out which transactions make up l', it doesn't really
-matter. T+l'+l = T+l since the transactions we log are idepotent (applying
+matter. T+l'+l = T+l since the transactions we log are idempotent (applying
 the transaction multiple times has the same result as applying the transaction
 once). So when we restore the snapshot we also play all transactions in the log
 that occur after the snapshot was begun. We can easily figure out where to

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Thu Jun 30 22:53:28 2011
@@ -123,6 +123,7 @@ public class CommitProcessor extends Thr
                         case OpCode.create:
                         case OpCode.delete:
                         case OpCode.setData:
+                        case OpCode.multi:
                         case OpCode.setACL:
                         case OpCode.createSession:
                         case OpCode.closeSession:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java?rev=1141746&r1=1141745&r2=1141746&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java Thu Jun 30 22:53:28 2011
@@ -43,6 +43,7 @@ import org.apache.zookeeper.txn.ErrorTxn
 import org.apache.zookeeper.txn.SetACLTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
+import org.apache.zookeeper.txn.MultiTxn;
 
 public class SerializeUtils {
     private static final Logger LOG = LoggerFactory.getLogger(SerializeUtils.class);
@@ -78,6 +79,11 @@ public class SerializeUtils {
         case OpCode.error:
             txn = new ErrorTxn();
             break;
+        case OpCode.multi:
+            txn = new MultiTxn();
+            break;
+        default:
+            throw new IOException("Unsupported Txn with type=%d" + hdr.getType());
         }
         if (txn != null) {
             try {

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/MultiResponseTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/MultiResponseTest.java?rev=1141746&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/MultiResponseTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/MultiResponseTest.java Thu Jun 30 22:53:28 2011
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import junit.framework.TestCase;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class MultiResponseTest extends TestCase {
+    public void testRoundTrip() throws IOException {
+        MultiResponse response = new MultiResponse();
+
+        response.add(new OpResult.CheckResult());
+        response.add(new OpResult.CreateResult("foo-bar"));
+        response.add(new OpResult.DeleteResult());
+
+        Stat s = new Stat();
+        s.setCzxid(546);
+        response.add(new OpResult.SetDataResult(s));
+
+        MultiResponse decodedResponse = codeDecode(response);
+
+        assertEquals(response, decodedResponse);
+        assertEquals(response.hashCode(), decodedResponse.hashCode());
+    }
+
+    @Test
+    public void testEmptyRoundTrip() throws IOException {
+        MultiResponse result = new MultiResponse();
+        MultiResponse decodedResult = codeDecode(result);
+
+        assertEquals(result, decodedResult);
+        assertEquals(result.hashCode(), decodedResult.hashCode());
+    }
+
+    private MultiResponse codeDecode(MultiResponse request) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        request.serialize(boa, "result");
+        baos.close();
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        bb.rewind();
+
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
+        MultiResponse decodedRequest = new MultiResponse();
+        decodedRequest.deserialize(bia, "result");
+        return decodedRequest;
+    }
+
+}

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/MultiTransactionRecordTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/MultiTransactionRecordTest.java?rev=1141746&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/MultiTransactionRecordTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/MultiTransactionRecordTest.java Thu Jun 30 22:53:28 2011
@@ -0,0 +1,50 @@
+package org.apache.zookeeper;
+
+import junit.framework.TestCase;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+public class MultiTransactionRecordTest extends TestCase {
+    @Test
+    public void testRoundTrip() throws IOException {
+        MultiTransactionRecord request = new MultiTransactionRecord();
+        request.add(Op.check("check", 1));
+        request.add(Op.create("create", "create data".getBytes(), ZooDefs.Ids.CREATOR_ALL_ACL, ZooDefs.Perms.ALL));
+        request.add(Op.delete("delete", 17));
+        request.add(Op.setData("setData", "set data".getBytes(), 19));
+
+        MultiTransactionRecord decodedRequest = codeDecode(request);
+
+        assertEquals(request, decodedRequest);
+        assertEquals(request.hashCode(), decodedRequest.hashCode());
+    }
+
+    @Test
+    public void testEmptyRoundTrip() throws IOException {
+        MultiTransactionRecord request = new MultiTransactionRecord();
+        MultiTransactionRecord decodedRequest = codeDecode(request);
+
+        assertEquals(request, decodedRequest);
+        assertEquals(request.hashCode(), decodedRequest.hashCode());
+    }
+
+    private MultiTransactionRecord codeDecode(MultiTransactionRecord request) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        request.serialize(boa, "request");
+        baos.close();
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+        bb.rewind();
+
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
+        MultiTransactionRecord decodedRequest = new MultiTransactionRecord();
+        decodedRequest.deserialize(bia, "request");
+        return decodedRequest;
+    }
+}

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java?rev=1141746&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java Thu Jun 30 22:53:28 2011
@@ -0,0 +1,114 @@
+package org.apache.zookeeper.server.quorum;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.ArrayList;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Index;
+import org.apache.jute.InputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Learner;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.txn.CreateTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LearnerTest extends ZKTestCase {
+	class SimpleLearnerZooKeeperServer extends LearnerZooKeeperServer {
+		boolean startupCalled;
+		
+		public SimpleLearnerZooKeeperServer(FileTxnSnapLog ftsl) throws IOException {
+			super(ftsl, 2000, 2000, 2000, null, new ZKDatabase(ftsl), null);
+		}
+		Learner learner;
+		@Override
+		public Learner getLearner() {
+			return learner;
+		}
+		
+		@Override
+		public void startup() {
+			startupCalled = true;
+		}
+	}
+	class SimpleLearner extends Learner {
+		SimpleLearner(FileTxnSnapLog ftsl) throws IOException {
+			self = new QuorumPeer();
+			zk = new SimpleLearnerZooKeeperServer(ftsl);
+			((SimpleLearnerZooKeeperServer)zk).learner = this;
+		}
+	}
+	static private void recursiveDelete(File dir) {
+		if (dir == null || !dir.exists()) {
+			return;
+		}
+		if (!dir.isDirectory()) {
+			dir.delete();
+		}
+		for(File child: dir.listFiles()) {
+			recursiveDelete(child);
+		}
+	}
+	@Test
+	public void syncTest() throws Exception {
+		File tmpFile = File.createTempFile("test", ".dir");
+		tmpFile.delete();
+		try {
+			FileTxnSnapLog ftsl = new FileTxnSnapLog(tmpFile, tmpFile);
+			SimpleLearner sl = new SimpleLearner(ftsl);
+			long startZxid = sl.zk.getLastProcessedZxid();
+			
+			// Set up bogus streams
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			BinaryOutputArchive oa = BinaryOutputArchive.getArchive(baos);
+			sl.leaderOs = BinaryOutputArchive.getArchive(new ByteArrayOutputStream());
+			
+			// make streams and socket do something innocuous
+			sl.bufferedOutput = new BufferedOutputStream(System.out);
+			sl.sock = new Socket();
+			
+			// fake messages from the server
+			QuorumPacket qp = new QuorumPacket(Leader.SNAP, 0, null, null);
+			oa.writeRecord(qp, null);
+			sl.zk.getZKDatabase().serializeSnapshot(oa);
+			oa.writeString("BenWasHere", "signature");
+			TxnHeader hdr = new TxnHeader(0, 0, 0, 0, ZooDefs.OpCode.create);
+			CreateTxn txn = new CreateTxn("/foo", new byte[0], new ArrayList<ACL>(), false, sl.zk.getZKDatabase().getNode("/").stat.getCversion());
+	        ByteArrayOutputStream tbaos = new ByteArrayOutputStream();
+	        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(tbaos);
+	        hdr.serialize(boa, "hdr");
+	        txn.serialize(boa, "txn");
+	        tbaos.close();
+			qp = new QuorumPacket(Leader.PROPOSAL, 1, tbaos.toByteArray(), null);
+			oa.writeRecord(qp, null);
+
+			// setup the messages to be streamed to follower
+			sl.leaderIs = BinaryInputArchive.getArchive(new ByteArrayInputStream(baos.toByteArray()));
+			
+			try {
+				sl.syncWithLeader(3);
+			} catch(EOFException e) {}
+			
+			sl.zk.shutdown();
+			sl = new SimpleLearner(ftsl);
+			Assert.assertEquals(startZxid, sl.zk.getLastProcessedZxid());
+		} finally {
+			recursiveDelete(tmpFile);
+		}
+	}
+}

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java.orig
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java.orig?rev=1141746&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java.orig (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java.orig Thu Jun 30 22:53:28 2011
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.io.*;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class CnxManagerTest extends ZKTestCase {
+    protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class);
+    protected static final int THRESHOLD = 4;
+
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File tmpdir[];
+    int port[];
+
+    @Before
+    public void setUp() throws Exception {
+
+        this.count = 3;
+        this.peers = new HashMap<Long,QuorumServer>(count);
+        tmpdir = new File[count];
+        port = new int[count];
+
+        for(int i = 0; i < count; i++) {
+            int clientport = PortAssignment.unique();
+            peers.put(Long.valueOf(i),
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                    new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = clientport;
+        }
+    }
+
+    ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
+        byte requestBytes[] = new byte[28];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+        /*
+         * Building notification packet to send
+         */
+
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(epoch);
+
+        return requestBuffer;
+    }
+
+    class CnxManagerThread extends Thread {
+
+        boolean failed;
+        CnxManagerThread(){
+            failed = false;
+        }
+
+        public void run(){
+            try {
+                QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2);
+                QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+                QuorumCnxManager.Listener listener = cnxManager.listener;
+                if(listener != null){
+                    listener.start();
+                } else {
+                    LOG.error("Null listener when initializing cnx manager");
+                }
+
+                long sid = 1;
+                cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
+
+                Message m = null;
+                int numRetries = 1;
+                while((m == null) && (numRetries++ <= THRESHOLD)){
+                    m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+                    if(m == null) cnxManager.connectAll();
+                }
+
+                if(numRetries > THRESHOLD){
+                    failed = true;
+                    return;
+                }
+
+                cnxManager.testInitiateConnection(sid);
+
+                m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+                if(m == null){
+                    failed = true;
+                    return;
+                }
+            } catch (Exception e) {
+                LOG.error("Exception while running mock thread", e);
+                Assert.fail("Unexpected exception");
+            }
+        }
+    }
+
+    @Test
+    public void testCnxManager() throws Exception {
+        CnxManagerThread thread = new CnxManagerThread();
+
+        thread.start();
+
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+
+        cnxManager.toSend(new Long(0), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
+
+        Message m = null;
+        int numRetries = 1;
+        while((m == null) && (numRetries++ <= THRESHOLD)){
+            m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
+            if(m == null) cnxManager.connectAll();
+        }
+
+        Assert.assertTrue("Exceeded number of retries", numRetries <= THRESHOLD);
+
+        thread.join(5000);
+        if (thread.isAlive()) {
+            Assert.fail("Thread didn't join");
+        } else {
+            if(thread.failed)
+                Assert.fail("Did not receive expected message");
+        }
+        
+    }
+
+    @Test
+    public void testCnxManagerTimeout() throws Exception {
+        Random rand = new Random();
+        byte b = (byte) rand.nextInt();
+        int deadPort = PortAssignment.unique();
+        String deadAddress = new String("10.1.1." + b);
+            
+        LOG.info("This is the dead address I'm trying: " + deadAddress);
+            
+        peers.put(Long.valueOf(2),
+                new QuorumServer(2,
+                        new InetSocketAddress(deadAddress, deadPort),
+                        new InetSocketAddress(deadAddress, PortAssignment.unique())));
+        tmpdir[2] = ClientBase.createTmpDir();
+        port[2] = deadPort;
+            
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+
+        long begin = System.currentTimeMillis();
+        cnxManager.toSend(new Long(2), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
+        long end = System.currentTimeMillis();
+            
+        if((end - begin) > 6000) Assert.fail("Waited more than necessary");
+        
+    }       
+    
+    /**
+     * Tests a bug in QuorumCnxManager that causes a spin lock
+     * when a negative value is sent. This test checks if the 
+     * connection is being closed upon a message with negative
+     * length.
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testCnxManagerSpinLock() throws Exception {               
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        InetSocketAddress addr = new InetSocketAddress(port);
+        
+        Thread.sleep(1000);
+        
+        SocketChannel sc = SocketChannel.open();
+        sc.socket().connect(peers.get(new Long(1)).electionAddr, 5000);
+        
+        /*
+         * Write id first then negative length.
+         */
+        byte[] msgBytes = new byte[8];
+        ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+        msgBuffer.putLong(new Long(2));
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        msgBuffer = ByteBuffer.wrap(new byte[4]);
+        msgBuffer.putInt(-20);
+        msgBuffer.position(0);
+        sc.write(msgBuffer);
+        
+        Thread.sleep(1000);
+        
+        try{
+            /*
+             * Write a number of times until it
+             * detects that the socket is broken.
+             */
+            for(int i = 0; i < 100; i++){
+                msgBuffer.position(0);
+                sc.write(msgBuffer);
+            }
+            Assert.fail("Socket has not been closed");
+        } catch (Exception e) {
+            LOG.info("Socket has been closed as expected");
+        }
+    }
+
+    /*
+     * Test if a receiveConnection is able to timeout on socket errors
+     */
+    @Test
+    public void testSocketTimeout() throws Exception {
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2000, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+        int port = peers.get(peer.getId()).electionAddr.getPort();
+        LOG.info("Election port: " + port);
+        InetSocketAddress addr = new InetSocketAddress(port);
+        Thread.sleep(1000);
+        
+        Socket sock = new Socket();
+        sock.connect(peers.get(new Long(1)).electionAddr, 5000);
+        long begin = System.currentTimeMillis();
+        // Read without sending data. Verify timeout.
+        cnxManager.receiveConnection(sock);
+        long end = System.currentTimeMillis();
+        if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited more than necessary");
+    }
+}