You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by he...@apache.org on 2011/05/20 01:24:52 UTC
svn commit: r1125179 - in /zookeeper/trunk/src/java:
main/org/apache/zookeeper/server/quorum/ test/org/apache/zookeeper/test/
Author: henry
Date: Thu May 19 23:24:52 2011
New Revision: 1125179
URL: http://svn.apache.org/viewvc?rev=1125179&view=rev
Log:
ZOOKEEPER-784. Server-side functionality for read-only mode (Sergey Doroshenko via henryr) - add missing files
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java?rev=1125179&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java Thu May 19 23:24:52 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerBean;
+
+/**
+ * ReadOnly MX Bean interface, implemented by ReadOnlyBean
+ *
+ */
+public class ReadOnlyBean extends ZooKeeperServerBean {
+
+ public ReadOnlyBean(ZooKeeperServer zks) {
+ super(zks);
+ }
+
+ public String getName() {
+ return "ReadOnlyServer";
+ }
+
+}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java?rev=1125179&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java Thu May 19 23:24:52 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This processor is at the beginning of the ReadOnlyZooKeeperServer's
+ * processors chain. All it does is, it passes read-only operations (e.g.
+ * OpCode.getData, OpCode.exists) through to the next processor, but drops
+ * state-changing operations (e.g. OpCode.create, OpCode.setData).
+ */
+public class ReadOnlyRequestProcessor extends Thread implements RequestProcessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyRequestProcessor.class);
+
+ private LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+
+ private boolean finished = false;
+
+ private RequestProcessor nextProcessor;
+
+ private ZooKeeperServer zks;
+
+ public ReadOnlyRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
+ super("ReadOnlyRequestProcessor:" + zks.getServerId());
+ this.zks = zks;
+ this.nextProcessor = nextProcessor;
+ }
+
+ public void run() {
+ try {
+ while (!finished) {
+ Request request = queuedRequests.take();
+
+ // log request
+ long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
+ if (request.type == OpCode.ping) {
+ traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
+ }
+ if (LOG.isTraceEnabled()) {
+ ZooTrace.logRequest(LOG, traceMask, 'R', request, "");
+ }
+ if (Request.requestOfDeath == request) {
+ break;
+ }
+
+ // filter read requests
+ switch (request.type) {
+ case OpCode.sync:
+ case OpCode.create:
+ case OpCode.delete:
+ case OpCode.setData:
+ case OpCode.setACL:
+
+ ReplyHeader hdr = new ReplyHeader(request.cxid, zks.getZKDatabase()
+ .getDataTreeLastProcessedZxid(), Code.NOTREADONLY.intValue());
+ try {
+ request.cnxn.sendResponse(hdr, null, null);
+ } catch (IOException e) {
+ LOG.error("IO exception while sending response", e);
+ }
+ continue;
+ }
+
+ // proceed to the next processor
+ if (nextProcessor != null) {
+ nextProcessor.processRequest(request);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Unexpected interruption", e);
+ }
+ LOG.info("ReadOnlyRequestProcessor exited loop!");
+ }
+
+ @Override
+ public void processRequest(Request request) {
+ if (!finished) {
+ queuedRequests.add(request);
+ }
+ }
+
+ @Override
+ public void shutdown() {
+ finished = true;
+ queuedRequests.clear();
+ queuedRequests.add(Request.requestOfDeath);
+ nextProcessor.shutdown();
+ }
+
+}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java?rev=1125179&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java Thu May 19 23:24:52 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * A ZooKeeperServer which comes into play when peer is partitioned from the
+ * majority. Handles read-only clients, but drops connections from not-read-only
+ * ones.
+ * <p>
+ * The very first processor in the chain of request processors is a
+ * ReadOnlyRequestProcessor which drops state-changing requests.
+ */
+public class ReadOnlyZooKeeperServer extends QuorumZooKeeperServer {
+
+ ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
+ DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
+ super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout,
+ treeBuilder, zkDb, self);
+ }
+
+ @Override
+ protected void setupRequestProcessors() {
+ RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+ RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
+ ((PrepRequestProcessor) prepProcessor).start();
+ firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
+ ((ReadOnlyRequestProcessor) firstProcessor).start();
+ }
+
+ @Override
+ public void startup() {
+ registerJMX(new ReadOnlyBean(this), self.jmxLocalPeerBean);
+ super.startup();
+
+ self.cnxnFactory.setZooKeeperServer(this);
+ }
+
+ @Override
+ protected void registerJMX() {
+ // register with JMX
+ try {
+ jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
+ MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxDataTreeBean = null;
+ }
+ }
+
+ public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) {
+ // register with JMX
+ try {
+ jmxServerBean = serverBean;
+ MBeanRegistry.getInstance().register(serverBean, localPeerBean);
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ jmxServerBean = null;
+ }
+ }
+
+ @Override
+ protected void unregisterJMX() {
+ // unregister from JMX
+ try {
+ if (jmxDataTreeBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxDataTreeBean = null;
+ }
+
+ protected void unregisterJMX(ZooKeeperServer zks) {
+ // unregister from JMX
+ try {
+ if (jmxServerBean != null) {
+ MBeanRegistry.getInstance().unregister(jmxServerBean);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to unregister with JMX", e);
+ }
+ jmxServerBean = null;
+ }
+
+ @Override
+ public String getState() {
+ return "read-only";
+ }
+
+ /**
+ * Returns the id of the associated QuorumPeer, which will do for a unique
+ * id of this server.
+ */
+ @Override
+ public long getServerId() {
+ return self.getId();
+ }
+
+ @Override
+ public void shutdown() {
+ unregisterJMX(this);
+
+ // set peer's server to null
+ self.cnxnFactory.setZooKeeperServer(null);
+ // clear all the connections
+ self.cnxnFactory.closeAll();
+
+ // shutdown the server itself
+ super.shutdown();
+ }
+
+}
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java?rev=1125179&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java Thu May 19 23:24:52 2011
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.LineNumberReader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import junit.framework.Assert;
+
+import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.WriterAppender;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NotReadOnlyException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReadOnlyModeTest extends QuorumBase {
+
+ private QuorumUtil qu = new QuorumUtil(1);
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ qu.startQuorum();
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception {
+ qu.tearDown();
+ }
+
+ /**
+ * Basic test of read-only client functionality. Tries to read and write
+ * during read-only mode, then regains a quorum and tries to write again.
+ */
+ @Test
+ public void testReadOnlyClient() throws Exception {
+ CountdownWatcher watcher = new CountdownWatcher();
+ ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+ watcher, true);
+ watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected
+
+ final String data = "Data to be read in RO mode";
+ final String node = "/tnode";
+ zk.create(node, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ watcher.reset();
+ qu.shutdown(2);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ // read operation during r/o mode
+ String remoteData = new String(zk.getData(node, false, null));
+ Assert.assertEquals(data, remoteData);
+
+ try {
+ zk.setData(node, "no way".getBytes(), -1);
+ Assert.fail("Write operation has succeeded during RO mode");
+ } catch (NotReadOnlyException e) {
+ // ok
+ }
+
+ watcher.reset();
+ qu.start(2);
+ Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
+ "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ zk.setData(node, "We're in the quorum now".getBytes(), -1);
+
+ zk.close();
+ }
+
+ /**
+ * Ensures that upon connection to a read-only server client receives
+ * ConnectedReadOnly state notification.
+ */
+ @Test
+ public void testConnectionEvents() throws Exception {
+ final List<KeeperState> states = new ArrayList<KeeperState>();
+ ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+ new Watcher() {
+ public void process(WatchedEvent event) {
+ states.add(event.getState());
+ }
+ }, true);
+
+ Thread.sleep(1000);
+ zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ // kill peer and wait no more than 5 seconds for read-only server
+ // to be started (which should take one tickTime (2 seconds))
+ qu.shutdown(2);
+ long start = System.currentTimeMillis();
+ while (!(zk.getState() == States.CONNECTEDREADONLY)) {
+ Thread.sleep(200);
+ Assert.assertTrue("Can't connect to the server", System
+ .currentTimeMillis()
+ - start < 5000);
+ }
+
+ // At this point states list should contain, in the given order,
+ // SyncConnected, Disconnected, and ConnectedReadOnly states
+ Assert.assertTrue("ConnectedReadOnly event wasn't received", states
+ .get(2) == KeeperState.ConnectedReadOnly);
+ zk.close();
+ }
+
+ /**
+ * Tests a situation when client firstly connects to a read-only server and
+ * then connects to a majority server. Transition should be transparent for
+ * the user.
+ */
+ @Test
+ public void testSessionEstablishment() throws Exception {
+ qu.shutdown(2);
+
+ CountdownWatcher watcher = new CountdownWatcher();
+ ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+ watcher, true);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ Assert.assertSame("should be in r/o mode", States.CONNECTEDREADONLY, zk
+ .getState());
+ long fakeId = zk.getSessionId();
+
+ watcher.reset();
+ qu.start(2);
+ Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
+ "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ Assert.assertFalse("fake session and real session have same id", zk
+ .getSessionId() == fakeId);
+
+ zk.close();
+ }
+
+ /**
+ * Ensures that client seeks for r/w servers while it's connected to r/o
+ * server.
+ */
+ @SuppressWarnings("deprecation")
+ @Test
+ public void testSeekForRwServer() throws Exception {
+
+ // setup the logger to capture all logs
+ Layout layout = Logger.getRootLogger().getAppender("CONSOLE")
+ .getLayout();
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ WriterAppender appender = new WriterAppender(layout, os);
+ appender.setImmediateFlush(true);
+ appender.setThreshold(Level.INFO);
+ Logger zlogger = Logger.getLogger("org.apache.zookeeper");
+ zlogger.addAppender(appender);
+
+ try {
+ qu.shutdown(2);
+ CountdownWatcher watcher = new CountdownWatcher();
+ ZooKeeper zk = new ZooKeeper(qu.getConnString(),
+ CONNECTION_TIMEOUT, watcher, true);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ // if we don't suspend a peer it will rejoin a quorum
+ qu.getPeer(1).peer.suspend();
+
+ // start two servers to form a quorum; client should detect this and
+ // connect to one of them
+ watcher.reset();
+ qu.start(2);
+ qu.start(3);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ // resume poor fellow
+ qu.getPeer(1).peer.resume();
+ } finally {
+ zlogger.removeAppender(appender);
+ }
+
+ os.close();
+ LineNumberReader r = new LineNumberReader(new StringReader(os
+ .toString()));
+ String line;
+ Pattern p = Pattern.compile(".*Majority server found.*");
+ boolean found = false;
+ while ((line = r.readLine()) != null) {
+ if (p.matcher(line).matches()) {
+ found = true;
+ break;
+ }
+ }
+ Assert.assertTrue(
+ "Majority server wasn't found while connected to r/o server",
+ found);
+ }
+}