You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by he...@apache.org on 2011/05/20 01:24:52 UTC

svn commit: r1125179 - in /zookeeper/trunk/src/java: main/org/apache/zookeeper/server/quorum/ test/org/apache/zookeeper/test/

Author: henry
Date: Thu May 19 23:24:52 2011
New Revision: 1125179

URL: http://svn.apache.org/viewvc?rev=1125179&view=rev
Log:
ZOOKEEPER-784. Server-side functionality for read-only mode (Sergey Doroshenko via henryr) - add missing files

Added:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java?rev=1125179&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyBean.java Thu May 19 23:24:52 2011
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerBean;
+
+/**
+ * ReadOnly MX Bean interface, implemented by ReadOnlyBean
+ *
+ */
+public class ReadOnlyBean extends ZooKeeperServerBean {
+
+    public ReadOnlyBean(ZooKeeperServer zks) {
+        super(zks);
+    }
+
+    public String getName() {
+        return "ReadOnlyServer";
+    }
+
+}

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java?rev=1125179&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java Thu May 19 23:24:52 2011
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooTrace;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This processor is at the beginning of the ReadOnlyZooKeeperServer's
+ * processors chain. All it does is, it passes read-only operations (e.g.
+ * OpCode.getData, OpCode.exists) through to the next processor, but drops
+ * state-changing operations (e.g. OpCode.create, OpCode.setData).
+ */
+public class ReadOnlyRequestProcessor extends Thread implements RequestProcessor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyRequestProcessor.class);
+
+    private LinkedBlockingQueue<Request> queuedRequests = new LinkedBlockingQueue<Request>();
+
+    private boolean finished = false;
+
+    private RequestProcessor nextProcessor;
+
+    private ZooKeeperServer zks;
+
+    public ReadOnlyRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
+        super("ReadOnlyRequestProcessor:" + zks.getServerId());
+        this.zks = zks;
+        this.nextProcessor = nextProcessor;
+    }
+
+    public void run() {
+        try {
+            while (!finished) {
+                Request request = queuedRequests.take();
+
+                // log request
+                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
+                if (request.type == OpCode.ping) {
+                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
+                }
+                if (LOG.isTraceEnabled()) {
+                    ZooTrace.logRequest(LOG, traceMask, 'R', request, "");
+                }
+                if (Request.requestOfDeath == request) {
+                    break;
+                }
+
+                // filter read requests
+                switch (request.type) {
+                case OpCode.sync:
+                case OpCode.create:
+                case OpCode.delete:
+                case OpCode.setData:
+                case OpCode.setACL:
+
+                    ReplyHeader hdr = new ReplyHeader(request.cxid, zks.getZKDatabase()
+                            .getDataTreeLastProcessedZxid(), Code.NOTREADONLY.intValue());
+                    try {
+                        request.cnxn.sendResponse(hdr, null, null);
+                    } catch (IOException e) {
+                        LOG.error("IO exception while sending response", e);
+                    }
+                    continue;
+                }
+
+                // proceed to the next processor
+                if (nextProcessor != null) {
+                    nextProcessor.processRequest(request);
+                }
+            }
+        } catch (InterruptedException e) {
+            LOG.error("Unexpected interruption", e);
+        }
+        LOG.info("ReadOnlyRequestProcessor exited loop!");
+    }
+
+    @Override
+    public void processRequest(Request request) {
+        if (!finished) {
+            queuedRequests.add(request);
+        }
+    }
+
+    @Override
+    public void shutdown() {
+        finished = true;
+        queuedRequests.clear();
+        queuedRequests.add(Request.requestOfDeath);
+        nextProcessor.shutdown();
+    }
+
+}

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java?rev=1125179&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java Thu May 19 23:24:52 2011
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServerBean;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+
+/**
+ * A ZooKeeperServer which comes into play when peer is partitioned from the
+ * majority. Handles read-only clients, but drops connections from not-read-only
+ * ones.
+ * <p>
+ * The very first processor in the chain of request processors is a
+ * ReadOnlyRequestProcessor which drops state-changing requests.
+ */
+public class ReadOnlyZooKeeperServer extends QuorumZooKeeperServer {
+
+    ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
+            DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
+        super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout,
+                treeBuilder, zkDb, self);
+    }
+
+    @Override
+    protected void setupRequestProcessors() {
+        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+        RequestProcessor prepProcessor = new PrepRequestProcessor(this, finalProcessor);
+        ((PrepRequestProcessor) prepProcessor).start();
+        firstProcessor = new ReadOnlyRequestProcessor(this, prepProcessor);
+        ((ReadOnlyRequestProcessor) firstProcessor).start();
+    }
+
+    @Override
+    public void startup() {
+        registerJMX(new ReadOnlyBean(this), self.jmxLocalPeerBean);
+        super.startup();
+
+        self.cnxnFactory.setZooKeeperServer(this);
+    }
+
+    @Override
+    protected void registerJMX() {
+        // register with JMX
+        try {
+            jmxDataTreeBean = new DataTreeBean(getZKDatabase().getDataTree());
+            MBeanRegistry.getInstance().register(jmxDataTreeBean, jmxServerBean);
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            jmxDataTreeBean = null;
+        }
+    }
+
+    public void registerJMX(ZooKeeperServerBean serverBean, LocalPeerBean localPeerBean) {
+        // register with JMX
+        try {
+            jmxServerBean = serverBean;
+            MBeanRegistry.getInstance().register(serverBean, localPeerBean);
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            jmxServerBean = null;
+        }
+    }
+
+    @Override
+    protected void unregisterJMX() {
+        // unregister from JMX
+        try {
+            if (jmxDataTreeBean != null) {
+                MBeanRegistry.getInstance().unregister(jmxDataTreeBean);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to unregister with JMX", e);
+        }
+        jmxDataTreeBean = null;
+    }
+
+    protected void unregisterJMX(ZooKeeperServer zks) {
+        // unregister from JMX
+        try {
+            if (jmxServerBean != null) {
+                MBeanRegistry.getInstance().unregister(jmxServerBean);
+            }
+        } catch (Exception e) {
+            LOG.warn("Failed to unregister with JMX", e);
+        }
+        jmxServerBean = null;
+    }
+
+    @Override
+    public String getState() {
+        return "read-only";
+    }
+
+    /**
+     * Returns the id of the associated QuorumPeer, which will do for a unique
+     * id of this server.
+     */
+    @Override
+    public long getServerId() {
+        return self.getId();
+    }
+
+    @Override
+    public void shutdown() {
+        unregisterJMX(this);
+
+        // set peer's server to null
+        self.cnxnFactory.setZooKeeperServer(null);
+        // clear all the connections
+        self.cnxnFactory.closeAll();
+
+        // shutdown the server itself
+        super.shutdown();
+    }
+
+}

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java?rev=1125179&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java Thu May 19 23:24:52 2011
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.LineNumberReader;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import junit.framework.Assert;
+
+import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.WriterAppender;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.KeeperException.NotReadOnlyException;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooKeeper.States;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ReadOnlyModeTest extends QuorumBase {
+
+    private QuorumUtil qu = new QuorumUtil(1);
+
+    @Before
+    @Override
+    public void setUp() throws Exception {
+        qu.startQuorum();
+    }
+
+    @After
+    @Override
+    public void tearDown() throws Exception {
+        qu.tearDown();
+    }
+
+    /**
+     * Basic test of read-only client functionality. Tries to read and write
+     * during read-only mode, then regains a quorum and tries to write again.
+     */
+    @Test
+    public void testReadOnlyClient() throws Exception {
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected
+
+        final String data = "Data to be read in RO mode";
+        final String node = "/tnode";
+        zk.create(node, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        watcher.reset();
+        qu.shutdown(2);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        // read operation during r/o mode
+        String remoteData = new String(zk.getData(node, false, null));
+        Assert.assertEquals(data, remoteData);
+
+        try {
+            zk.setData(node, "no way".getBytes(), -1);
+            Assert.fail("Write operation has succeeded during RO mode");
+        } catch (NotReadOnlyException e) {
+            // ok
+        }
+
+        watcher.reset();
+        qu.start(2);
+        Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
+                "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        zk.setData(node, "We're in the quorum now".getBytes(), -1);
+
+        zk.close();
+    }
+
+    /**
+     * Ensures that upon connection to a read-only server client receives
+     * ConnectedReadOnly state notification.
+     */
+    @Test
+    public void testConnectionEvents() throws Exception {
+        final List<KeeperState> states = new ArrayList<KeeperState>();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                new Watcher() {
+                    public void process(WatchedEvent event) {
+                        states.add(event.getState());
+                    }
+                }, true);
+
+        Thread.sleep(1000);
+        zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // kill peer and wait no more than 5 seconds for read-only server
+        // to be started (which should take one tickTime (2 seconds))
+        qu.shutdown(2);
+        long start = System.currentTimeMillis();
+        while (!(zk.getState() == States.CONNECTEDREADONLY)) {
+            Thread.sleep(200);
+            Assert.assertTrue("Can't connect to the server", System
+                    .currentTimeMillis()
+                    - start < 5000);
+        }
+
+        // At this point states list should contain, in the given order,
+        // SyncConnected, Disconnected, and ConnectedReadOnly states
+        Assert.assertTrue("ConnectedReadOnly event wasn't received", states
+                .get(2) == KeeperState.ConnectedReadOnly);
+        zk.close();
+    }
+
+    /**
+     * Tests a situation when client firstly connects to a read-only server and
+     * then connects to a majority server. Transition should be transparent for
+     * the user.
+     */
+    @Test
+    public void testSessionEstablishment() throws Exception {
+        qu.shutdown(2);
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        Assert.assertSame("should be in r/o mode", States.CONNECTEDREADONLY, zk
+                .getState());
+        long fakeId = zk.getSessionId();
+
+        watcher.reset();
+        qu.start(2);
+        Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
+                "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        Assert.assertFalse("fake session and real session have same id", zk
+                .getSessionId() == fakeId);
+
+        zk.close();
+    }
+
+    /**
+     * Ensures that client seeks for r/w servers while it's connected to r/o
+     * server.
+     */
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testSeekForRwServer() throws Exception {
+
+        // setup the logger to capture all logs
+        Layout layout = Logger.getRootLogger().getAppender("CONSOLE")
+                .getLayout();
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        WriterAppender appender = new WriterAppender(layout, os);
+        appender.setImmediateFlush(true);
+        appender.setThreshold(Level.INFO);
+        Logger zlogger = Logger.getLogger("org.apache.zookeeper");
+        zlogger.addAppender(appender);
+
+        try {
+            qu.shutdown(2);
+            CountdownWatcher watcher = new CountdownWatcher();
+            ZooKeeper zk = new ZooKeeper(qu.getConnString(),
+                    CONNECTION_TIMEOUT, watcher, true);
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+            // if we don't suspend a peer it will rejoin a quorum
+            qu.getPeer(1).peer.suspend();
+
+            // start two servers to form a quorum; client should detect this and
+            // connect to one of them
+            watcher.reset();
+            qu.start(2);
+            qu.start(3);
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            zk.create("/test", "test".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+
+            // resume poor fellow
+            qu.getPeer(1).peer.resume();
+        } finally {
+            zlogger.removeAppender(appender);
+        }
+
+        os.close();
+        LineNumberReader r = new LineNumberReader(new StringReader(os
+                .toString()));
+        String line;
+        Pattern p = Pattern.compile(".*Majority server found.*");
+        boolean found = false;
+        while ((line = r.readLine()) != null) {
+            if (p.matcher(line).matches()) {
+                found = true;
+                break;
+            }
+        }
+        Assert.assertTrue(
+                "Majority server wasn't found while connected to r/o server",
+                found);
+    }
+}