You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ra...@apache.org on 2014/07/18 19:46:52 UTC

svn commit: r1611732 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: rakeshr
Date: Fri Jul 18 17:46:51 2014
New Revision: 1611732

URL: http://svn.apache.org/r1611732
Log:
ZOOKEEPER-1851. Follower and Observer Request Processors Do Not Forward create2 Requests (Chris Chen via rakeshr)

Added:
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Jul 18 17:46:51 2014
@@ -705,6 +705,9 @@ BUGFIXES:
   ZOOKEEPER-1969. Fix Port Already In Use for JettyAdminServerTest
   (Hongchao Deng via phunt)
 
+  ZOOKEEPER-1851. Follower and Observer Request Processors Do Not Forward
+  create2 Requests (Chris Chen via rakeshr)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java Fri Jul 18 17:46:51 2014
@@ -35,6 +35,8 @@ public class TraceFormatter {
             return "notification";
         case OpCode.create:
             return "create";
+        case OpCode.create2:
+            return "create2";
         case OpCode.delete:
             return "delete";
         case OpCode.exists:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Fri Jul 18 17:46:51 2014
@@ -131,6 +131,7 @@ public class CommitProcessor extends Zoo
     protected boolean needCommit(Request request) {
         switch (request.type) {
             case OpCode.create:
+            case OpCode.create2:
             case OpCode.delete:
             case OpCode.setData:
             case OpCode.reconfig:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Fri Jul 18 17:46:51 2014
@@ -82,6 +82,7 @@ public class FollowerRequestProcessor ex
                     zks.getFollower().request(request);
                     break;
                 case OpCode.create:
+                case OpCode.create2:
                 case OpCode.delete:
                 case OpCode.setData:
                 case OpCode.reconfig:

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Fri Jul 18 17:46:51 2014
@@ -91,6 +91,7 @@ public class ObserverRequestProcessor ex
                     zks.getObserver().request(request);
                     break;
                 case OpCode.create:
+                case OpCode.create2:
                 case OpCode.delete:
                 case OpCode.setData:
                 case OpCode.reconfig:

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java?rev=1611732&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java Fri Jul 18 17:46:51 2014
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.QuorumBase;
+
+@RunWith(Parameterized.class)
+public class QuorumRequestPipelineTest extends QuorumBase {
+    protected ServerState serverState;
+    protected final CountDownLatch callComplete = new CountDownLatch(1);
+    protected boolean complete = false;
+    protected final static String PARENT_PATH = "/foo";
+    protected final static HashSet<String> CHILDREN = new HashSet<String>(Arrays.asList("1", "2", "3"));
+    protected final static String AUTH_PROVIDER = "digest";
+    protected final static byte[] AUTH = "hello".getBytes();
+    protected final static byte[] DATA = "Hint Water".getBytes();
+
+    protected TestableZooKeeper zkClient;
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(
+            new Object[][] {
+                {ServerState.LEADING},
+                {ServerState.FOLLOWING},
+                {ServerState.OBSERVING}});
+    }
+
+    public QuorumRequestPipelineTest(ServerState state) {
+        this.serverState = state;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        CountdownWatcher clientWatch = new CountdownWatcher();
+        super.setUp(true);
+        zkClient = createClient(clientWatch, getPeersMatching(serverState));
+        zkClient.addAuthInfo(AUTH_PROVIDER, AUTH);
+        clientWatch.waitForConnected(CONNECTION_TIMEOUT);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        zkClient.close();
+        super.tearDown();
+    }
+
+    private Stat create2EmptyNode(TestableZooKeeper zkClient, String path) throws Exception {
+        Stat stat = new Stat();
+        zkClient.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+        return stat;
+    }
+
+    @Test
+    public void testCreate() throws Exception {
+        zkClient.create(PARENT_PATH, DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Assert.assertArrayEquals(
+            String.format("%s Node created (create) with expected value", serverState),
+            DATA,
+            zkClient.getData(PARENT_PATH, false, null));
+    }
+
+    @Test
+    public void testCreate2() throws Exception {
+        zkClient.create(PARENT_PATH, DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, null);
+        Assert.assertArrayEquals(
+            String.format("%s Node created (create2) with expected value", serverState),
+            DATA,
+            zkClient.getData(PARENT_PATH, false, null));
+    }
+
+    @Test
+    public void testDelete() throws Exception {
+        create2EmptyNode(zkClient, PARENT_PATH);
+        zkClient.delete(PARENT_PATH, -1);
+        Assert.assertNull(
+            String.format("%s Node no longer exists", serverState),
+            zkClient.exists(PARENT_PATH, false));
+    }
+
+    @Test
+    public void testExists() throws Exception {
+        Stat stat = create2EmptyNode(zkClient, PARENT_PATH);
+        Assert.assertEquals(
+            String.format("%s Exists returns correct node stat", serverState),
+            stat,
+            zkClient.exists(PARENT_PATH, false));
+    }
+
+    @Test
+    public void testSetAndGetData() throws Exception {
+        create2EmptyNode(zkClient, PARENT_PATH);
+        zkClient.setData(PARENT_PATH, DATA, -1);
+        Assert.assertArrayEquals(
+            String.format("%s Node updated with expected value", serverState),
+            DATA,
+            zkClient.getData(PARENT_PATH, false, null));
+    }
+
+    @Test
+    public void testSetAndGetACL() throws Exception {
+        create2EmptyNode(zkClient, PARENT_PATH);
+        Assert.assertEquals(
+            String.format("%s Node has open ACL", serverState),
+            Ids.OPEN_ACL_UNSAFE,
+            zkClient.getACL(PARENT_PATH, new Stat()));
+        zkClient.setACL(PARENT_PATH, Ids.READ_ACL_UNSAFE, -1);
+        Assert.assertEquals(
+            String.format("%s Node has world read-only ACL", serverState),
+            Ids.READ_ACL_UNSAFE,
+            zkClient.getACL(PARENT_PATH, new Stat()));
+    }
+
+    @Test
+    public void testSetAndGetChildren() throws Exception {
+        create2EmptyNode(zkClient, PARENT_PATH);
+        for (String child : CHILDREN) {
+            create2EmptyNode(zkClient, PARENT_PATH + "/" + child);
+        }
+        Assert.assertEquals(
+            String.format("%s Parent has expected children", serverState),
+            CHILDREN,
+            new HashSet<String>(zkClient.getChildren(PARENT_PATH, false)));
+    }
+
+    @Test
+    public void testSetAndGetChildren2() throws Exception {
+        create2EmptyNode(zkClient, PARENT_PATH);
+        for (String child : CHILDREN) {
+            create2EmptyNode(zkClient, PARENT_PATH + "/" + child);
+        }
+        Assert.assertEquals(
+            String.format("%s Parent has expected children", serverState),
+            CHILDREN,
+            new HashSet<String>(zkClient.getChildren(PARENT_PATH, false, null)));
+    }
+
+    @Test
+    public void testSync() throws Exception {
+        complete = false;
+        create2EmptyNode(zkClient, PARENT_PATH);
+        VoidCallback onSync = new VoidCallback() {
+            @Override
+            public void processResult(int rc, String path, Object ctx) {
+                complete = true;
+                callComplete.countDown();
+            }
+        };
+        zkClient.sync(PARENT_PATH, onSync, null);
+        callComplete.await(30, TimeUnit.SECONDS);
+        Assert.assertTrue(
+            String.format("%s Sync completed", serverState),
+            complete);
+    }
+}

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Fri Jul 18 17:46:51 2014
@@ -43,6 +43,8 @@ import org.junit.Test;
 public class QuorumBase extends ClientBase {
     private static final Logger LOG = LoggerFactory.getLogger(QuorumBase.class);
 
+    private static final String LOCALADDR = "127.0.0.1";
+
     File s1dir, s2dir, s3dir, s4dir, s5dir;
     QuorumPeer s1, s2, s3, s4, s5;
     protected int port1;
@@ -136,29 +138,29 @@ public class QuorumBase extends ClientBa
         int syncLimit = 3;
         HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
         peers.put(Long.valueOf(1), new QuorumServer(1, 
-                new InetSocketAddress("127.0.0.1", port1 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE1 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient1),
+                new InetSocketAddress(LOCALADDR, port1 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE1 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient1),
                 LearnerType.PARTICIPANT));
         peers.put(Long.valueOf(2), new QuorumServer(2, 
-                new InetSocketAddress("127.0.0.1", port2 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE2 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient2),
+                new InetSocketAddress(LOCALADDR, port2 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE2 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient2),
                 LearnerType.PARTICIPANT));
         peers.put(Long.valueOf(3), new QuorumServer(3, 
-                new InetSocketAddress("127.0.0.1", port3 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE3 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient3),
+                new InetSocketAddress(LOCALADDR, port3 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE3 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient3),
                 LearnerType.PARTICIPANT));
         peers.put(Long.valueOf(4), new QuorumServer(4, 
-                new InetSocketAddress("127.0.0.1", port4 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE4 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient4),
+                new InetSocketAddress(LOCALADDR, port4 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE4 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient4),
                 LearnerType.PARTICIPANT));
         peers.put(Long.valueOf(5), new QuorumServer(5, 
-                new InetSocketAddress("127.0.0.1", port5 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE5 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient5),
+                new InetSocketAddress(LOCALADDR, port5 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE5 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient5),
                 LearnerType.PARTICIPANT));
         
         if (withObservers) {
@@ -262,6 +264,17 @@ public class QuorumBase extends ClientBa
       return -1;
     }
 
+    public String getPeersMatching(ServerState state) {
+        StringBuilder hosts = new StringBuilder();
+        for (QuorumPeer p : getPeerList()) {
+            if (p.getPeerState() == state) {
+                hosts.append(String.format("%s:%d,", LOCALADDR, p.getClientAddress().getPort()));
+            }
+        }
+        LOG.info("getPeersMatching ports are {}", hosts);
+        return hosts.toString();
+    }
+
     public ArrayList<QuorumPeer> getPeerList() {
         ArrayList<QuorumPeer> peers = new ArrayList<QuorumPeer>();
         peers.add(s1);
@@ -290,29 +303,29 @@ public class QuorumBase extends ClientBa
             peers = new HashMap<Long,QuorumServer>();
 
             peers.put(Long.valueOf(1), new QuorumServer(1, 
-                new InetSocketAddress("127.0.0.1", port1 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE1 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient1),
+                new InetSocketAddress(LOCALADDR, port1 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE1 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient1),
                 LearnerType.PARTICIPANT));
             peers.put(Long.valueOf(2), new QuorumServer(2, 
-                new InetSocketAddress("127.0.0.1", port2 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE2 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient2),
+                new InetSocketAddress(LOCALADDR, port2 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE2 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient2),
                 LearnerType.PARTICIPANT));
             peers.put(Long.valueOf(3), new QuorumServer(3, 
-                new InetSocketAddress("127.0.0.1", port3 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE3 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient3),
+                new InetSocketAddress(LOCALADDR, port3 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE3 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient3),
                 LearnerType.PARTICIPANT));
             peers.put(Long.valueOf(4), new QuorumServer(4, 
-                new InetSocketAddress("127.0.0.1", port4 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE4 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient4),
+                new InetSocketAddress(LOCALADDR, port4 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE4 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient4),
                 LearnerType.PARTICIPANT));
             peers.put(Long.valueOf(5), new QuorumServer(5, 
-                new InetSocketAddress("127.0.0.1", port5 + 1000),
-                new InetSocketAddress("127.0.0.1", portLE5 + 1000),
-                new InetSocketAddress("127.0.0.1", portClient5),
+                new InetSocketAddress(LOCALADDR, port5 + 1000),
+                new InetSocketAddress(LOCALADDR, portLE5 + 1000),
+                new InetSocketAddress(LOCALADDR, portClient5),
                 LearnerType.PARTICIPANT));
         }
         
@@ -413,4 +426,10 @@ public class QuorumBase extends ClientBa
         CountdownWatcher watcher = new CountdownWatcher();
         return createClient(watcher, hp);
     }
+
+    protected TestableZooKeeper createClient(CountdownWatcher watcher, ServerState state)
+        throws IOException, InterruptedException
+    {
+        return createClient(watcher, getPeersMatching(state));
+    }
 }