You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ra...@apache.org on 2014/07/18 19:46:52 UTC
svn commit: r1611732 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: rakeshr
Date: Fri Jul 18 17:46:51 2014
New Revision: 1611732
URL: http://svn.apache.org/r1611732
Log:
ZOOKEEPER-1851. Follower and Observer Request Processors Do Not Forward create2 Requests (Chris Chen via rakeshr)
Added:
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Jul 18 17:46:51 2014
@@ -705,6 +705,9 @@ BUGFIXES:
ZOOKEEPER-1969. Fix Port Already In Use for JettyAdminServerTest
(Hongchao Deng via phunt)
+ ZOOKEEPER-1851. Follower and Observer Request Processors Do Not Forward
+ create2 Requests (Chris Chen via rakeshr)
+
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java Fri Jul 18 17:46:51 2014
@@ -35,6 +35,8 @@ public class TraceFormatter {
return "notification";
case OpCode.create:
return "create";
+ case OpCode.create2:
+ return "create2";
case OpCode.delete:
return "delete";
case OpCode.exists:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Fri Jul 18 17:46:51 2014
@@ -131,6 +131,7 @@ public class CommitProcessor extends Zoo
protected boolean needCommit(Request request) {
switch (request.type) {
case OpCode.create:
+ case OpCode.create2:
case OpCode.delete:
case OpCode.setData:
case OpCode.reconfig:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Fri Jul 18 17:46:51 2014
@@ -82,6 +82,7 @@ public class FollowerRequestProcessor ex
zks.getFollower().request(request);
break;
case OpCode.create:
+ case OpCode.create2:
case OpCode.delete:
case OpCode.setData:
case OpCode.reconfig:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Fri Jul 18 17:46:51 2014
@@ -91,6 +91,7 @@ public class ObserverRequestProcessor ex
zks.getObserver().request(request);
break;
case OpCode.create:
+ case OpCode.create2:
case OpCode.delete:
case OpCode.setData:
case OpCode.reconfig:
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java?rev=1611732&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java Fri Jul 18 17:46:51 2014
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.QuorumBase;
+
+@RunWith(Parameterized.class)
+public class QuorumRequestPipelineTest extends QuorumBase {
+ protected ServerState serverState;
+ protected final CountDownLatch callComplete = new CountDownLatch(1);
+ protected boolean complete = false;
+ protected final static String PARENT_PATH = "/foo";
+ protected final static HashSet<String> CHILDREN = new HashSet<String>(Arrays.asList("1", "2", "3"));
+ protected final static String AUTH_PROVIDER = "digest";
+ protected final static byte[] AUTH = "hello".getBytes();
+ protected final static byte[] DATA = "Hint Water".getBytes();
+
+ protected TestableZooKeeper zkClient;
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> data() {
+ return Arrays.asList(
+ new Object[][] {
+ {ServerState.LEADING},
+ {ServerState.FOLLOWING},
+ {ServerState.OBSERVING}});
+ }
+
+ public QuorumRequestPipelineTest(ServerState state) {
+ this.serverState = state;
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ CountdownWatcher clientWatch = new CountdownWatcher();
+ super.setUp(true);
+ zkClient = createClient(clientWatch, getPeersMatching(serverState));
+ zkClient.addAuthInfo(AUTH_PROVIDER, AUTH);
+ clientWatch.waitForConnected(CONNECTION_TIMEOUT);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ zkClient.close();
+ super.tearDown();
+ }
+
+ private Stat create2EmptyNode(TestableZooKeeper zkClient, String path) throws Exception {
+ Stat stat = new Stat();
+ zkClient.create(path, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, stat);
+ return stat;
+ }
+
+ @Test
+ public void testCreate() throws Exception {
+ zkClient.create(PARENT_PATH, DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Assert.assertArrayEquals(
+ String.format("%s Node created (create) with expected value", serverState),
+ DATA,
+ zkClient.getData(PARENT_PATH, false, null));
+ }
+
+ @Test
+ public void testCreate2() throws Exception {
+ zkClient.create(PARENT_PATH, DATA, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, null);
+ Assert.assertArrayEquals(
+ String.format("%s Node created (create2) with expected value", serverState),
+ DATA,
+ zkClient.getData(PARENT_PATH, false, null));
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ create2EmptyNode(zkClient, PARENT_PATH);
+ zkClient.delete(PARENT_PATH, -1);
+ Assert.assertNull(
+ String.format("%s Node no longer exists", serverState),
+ zkClient.exists(PARENT_PATH, false));
+ }
+
+ @Test
+ public void testExists() throws Exception {
+ Stat stat = create2EmptyNode(zkClient, PARENT_PATH);
+ Assert.assertEquals(
+ String.format("%s Exists returns correct node stat", serverState),
+ stat,
+ zkClient.exists(PARENT_PATH, false));
+ }
+
+ @Test
+ public void testSetAndGetData() throws Exception {
+ create2EmptyNode(zkClient, PARENT_PATH);
+ zkClient.setData(PARENT_PATH, DATA, -1);
+ Assert.assertArrayEquals(
+ String.format("%s Node updated with expected value", serverState),
+ DATA,
+ zkClient.getData(PARENT_PATH, false, null));
+ }
+
+ @Test
+ public void testSetAndGetACL() throws Exception {
+ create2EmptyNode(zkClient, PARENT_PATH);
+ Assert.assertEquals(
+ String.format("%s Node has open ACL", serverState),
+ Ids.OPEN_ACL_UNSAFE,
+ zkClient.getACL(PARENT_PATH, new Stat()));
+ zkClient.setACL(PARENT_PATH, Ids.READ_ACL_UNSAFE, -1);
+ Assert.assertEquals(
+ String.format("%s Node has world read-only ACL", serverState),
+ Ids.READ_ACL_UNSAFE,
+ zkClient.getACL(PARENT_PATH, new Stat()));
+ }
+
+ @Test
+ public void testSetAndGetChildren() throws Exception {
+ create2EmptyNode(zkClient, PARENT_PATH);
+ for (String child : CHILDREN) {
+ create2EmptyNode(zkClient, PARENT_PATH + "/" + child);
+ }
+ Assert.assertEquals(
+ String.format("%s Parent has expected children", serverState),
+ CHILDREN,
+ new HashSet<String>(zkClient.getChildren(PARENT_PATH, false)));
+ }
+
+ @Test
+ public void testSetAndGetChildren2() throws Exception {
+ create2EmptyNode(zkClient, PARENT_PATH);
+ for (String child : CHILDREN) {
+ create2EmptyNode(zkClient, PARENT_PATH + "/" + child);
+ }
+ Assert.assertEquals(
+ String.format("%s Parent has expected children", serverState),
+ CHILDREN,
+ new HashSet<String>(zkClient.getChildren(PARENT_PATH, false, null)));
+ }
+
+ @Test
+ public void testSync() throws Exception {
+ complete = false;
+ create2EmptyNode(zkClient, PARENT_PATH);
+ VoidCallback onSync = new VoidCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx) {
+ complete = true;
+ callComplete.countDown();
+ }
+ };
+ zkClient.sync(PARENT_PATH, onSync, null);
+ callComplete.await(30, TimeUnit.SECONDS);
+ Assert.assertTrue(
+ String.format("%s Sync completed", serverState),
+ complete);
+ }
+}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=1611732&r1=1611731&r2=1611732&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Fri Jul 18 17:46:51 2014
@@ -43,6 +43,8 @@ import org.junit.Test;
public class QuorumBase extends ClientBase {
private static final Logger LOG = LoggerFactory.getLogger(QuorumBase.class);
+ private static final String LOCALADDR = "127.0.0.1";
+
File s1dir, s2dir, s3dir, s4dir, s5dir;
QuorumPeer s1, s2, s3, s4, s5;
protected int port1;
@@ -136,29 +138,29 @@ public class QuorumBase extends ClientBa
int syncLimit = 3;
HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
peers.put(Long.valueOf(1), new QuorumServer(1,
- new InetSocketAddress("127.0.0.1", port1 + 1000),
- new InetSocketAddress("127.0.0.1", portLE1 + 1000),
- new InetSocketAddress("127.0.0.1", portClient1),
+ new InetSocketAddress(LOCALADDR, port1 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE1 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient1),
LearnerType.PARTICIPANT));
peers.put(Long.valueOf(2), new QuorumServer(2,
- new InetSocketAddress("127.0.0.1", port2 + 1000),
- new InetSocketAddress("127.0.0.1", portLE2 + 1000),
- new InetSocketAddress("127.0.0.1", portClient2),
+ new InetSocketAddress(LOCALADDR, port2 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE2 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient2),
LearnerType.PARTICIPANT));
peers.put(Long.valueOf(3), new QuorumServer(3,
- new InetSocketAddress("127.0.0.1", port3 + 1000),
- new InetSocketAddress("127.0.0.1", portLE3 + 1000),
- new InetSocketAddress("127.0.0.1", portClient3),
+ new InetSocketAddress(LOCALADDR, port3 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE3 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient3),
LearnerType.PARTICIPANT));
peers.put(Long.valueOf(4), new QuorumServer(4,
- new InetSocketAddress("127.0.0.1", port4 + 1000),
- new InetSocketAddress("127.0.0.1", portLE4 + 1000),
- new InetSocketAddress("127.0.0.1", portClient4),
+ new InetSocketAddress(LOCALADDR, port4 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE4 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient4),
LearnerType.PARTICIPANT));
peers.put(Long.valueOf(5), new QuorumServer(5,
- new InetSocketAddress("127.0.0.1", port5 + 1000),
- new InetSocketAddress("127.0.0.1", portLE5 + 1000),
- new InetSocketAddress("127.0.0.1", portClient5),
+ new InetSocketAddress(LOCALADDR, port5 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE5 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient5),
LearnerType.PARTICIPANT));
if (withObservers) {
@@ -262,6 +264,17 @@ public class QuorumBase extends ClientBa
return -1;
}
+ public String getPeersMatching(ServerState state) {
+ StringBuilder hosts = new StringBuilder();
+ for (QuorumPeer p : getPeerList()) {
+ if (p.getPeerState() == state) {
+ hosts.append(String.format("%s:%d,", LOCALADDR, p.getClientAddress().getPort()));
+ }
+ }
+ LOG.info("getPeersMatching ports are {}", hosts);
+ return hosts.toString();
+ }
+
public ArrayList<QuorumPeer> getPeerList() {
ArrayList<QuorumPeer> peers = new ArrayList<QuorumPeer>();
peers.add(s1);
@@ -290,29 +303,29 @@ public class QuorumBase extends ClientBa
peers = new HashMap<Long,QuorumServer>();
peers.put(Long.valueOf(1), new QuorumServer(1,
- new InetSocketAddress("127.0.0.1", port1 + 1000),
- new InetSocketAddress("127.0.0.1", portLE1 + 1000),
- new InetSocketAddress("127.0.0.1", portClient1),
+ new InetSocketAddress(LOCALADDR, port1 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE1 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient1),
LearnerType.PARTICIPANT));
peers.put(Long.valueOf(2), new QuorumServer(2,
- new InetSocketAddress("127.0.0.1", port2 + 1000),
- new InetSocketAddress("127.0.0.1", portLE2 + 1000),
- new InetSocketAddress("127.0.0.1", portClient2),
+ new InetSocketAddress(LOCALADDR, port2 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE2 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient2),
LearnerType.PARTICIPANT));
peers.put(Long.valueOf(3), new QuorumServer(3,
- new InetSocketAddress("127.0.0.1", port3 + 1000),
- new InetSocketAddress("127.0.0.1", portLE3 + 1000),
- new InetSocketAddress("127.0.0.1", portClient3),
+ new InetSocketAddress(LOCALADDR, port3 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE3 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient3),
LearnerType.PARTICIPANT));
peers.put(Long.valueOf(4), new QuorumServer(4,
- new InetSocketAddress("127.0.0.1", port4 + 1000),
- new InetSocketAddress("127.0.0.1", portLE4 + 1000),
- new InetSocketAddress("127.0.0.1", portClient4),
+ new InetSocketAddress(LOCALADDR, port4 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE4 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient4),
LearnerType.PARTICIPANT));
peers.put(Long.valueOf(5), new QuorumServer(5,
- new InetSocketAddress("127.0.0.1", port5 + 1000),
- new InetSocketAddress("127.0.0.1", portLE5 + 1000),
- new InetSocketAddress("127.0.0.1", portClient5),
+ new InetSocketAddress(LOCALADDR, port5 + 1000),
+ new InetSocketAddress(LOCALADDR, portLE5 + 1000),
+ new InetSocketAddress(LOCALADDR, portClient5),
LearnerType.PARTICIPANT));
}
@@ -413,4 +426,10 @@ public class QuorumBase extends ClientBa
CountdownWatcher watcher = new CountdownWatcher();
return createClient(watcher, hp);
}
+
+ protected TestableZooKeeper createClient(CountdownWatcher watcher, ServerState state)
+ throws IOException, InterruptedException
+ {
+ return createClient(watcher, getPeersMatching(state));
+ }
}