You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2009/08/03 23:27:09 UTC

svn commit: r800574 - in /hadoop/zookeeper/trunk: CHANGES.txt src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

Author: mahadev
Date: Mon Aug  3 21:27:09 2009
New Revision: 800574

URL: http://svn.apache.org/viewvc?rev=800574&view=rev
Log:
ZOOKEEPER-481. Add lastMessageSent to QuorumCnxManager. (flavio via mahadev)

Added:
    hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Modified:
    hadoop/zookeeper/trunk/CHANGES.txt
    hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=800574&r1=800573&r2=800574&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Mon Aug  3 21:27:09 2009
@@ -27,6 +27,9 @@
   ZOOKEEPER-457. Make ZookeeperMain public, support for HBase (and other)
   embedded clients (ryan rawson via phunt)
 
+  ZOOKEEPER-481. Add lastMessageSent to QuorumCnxManager. (flavio via
+mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
   "socket reuse" and failure to close client (phunt via mahadev)

Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=800574&r1=800573&r2=800574&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Mon Aug  3 21:27:09 2009
@@ -78,11 +78,12 @@
      */
     ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
     ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
+    ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
 
     /*
      * Reception queue
      */
-    ArrayBlockingQueue<Message> recvQueue;
+    public ArrayBlockingQueue<Message> recvQueue;
 
     /*
      * Shutdown flag
@@ -93,7 +94,7 @@
     /*
      * Listener thread
      */
-    Listener listener;
+    public Listener listener;
 
     static class Message {
         Message(ByteBuffer buffer, long sid) {
@@ -109,6 +110,8 @@
         this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
         this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
         this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
+        this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
+        
         this.self = self;
 
         // Starts listener thread that waits for connection requests 
@@ -116,11 +119,25 @@
     }
 
     /**
+     * Invokes initiateConnection for testing purposes
+     * 
+     * @param sid
+     */
+    public void testInitiateConnection(long sid) throws Exception {
+        SocketChannel channel;
+        LOG.debug("Opening channel to server "  + sid);
+        channel = SocketChannel
+                .open(self.quorumPeers.get(sid).electionAddr);
+        channel.socket().setTcpNoDelay(true);
+        initiateConnection(channel, sid);
+    }
+    
+    /**
      * If this server has initiated the connection, then it gives up on the
      * connection if it loses challenge. Otherwise, it keeps the connection.
      */
 
-    boolean initiateConnection(SocketChannel s, Long sid) {
+    public boolean initiateConnection(SocketChannel s, Long sid) {
          try {
             // Sending id and challenge
             byte[] msgBytes = new byte[8];
@@ -144,31 +161,26 @@
                         + "reopen connection: ", e);
             }
         // Otherwise proceed with the connection
-        } else {
-        	SendWorker sw = new SendWorker(s, sid);
-        	RecvWorker rw = new RecvWorker(s, sid);
-        	sw.setRecv(rw);
-        	
-        	if (senderWorkerMap
-        			.containsKey(sid)) {
-        	    SendWorker vsw = senderWorkerMap.get(sid);
-        	    if(vsw != null)
-        	        vsw.finish();
-        	    else LOG.error("No SendWorker for this identifier (" + sid + ")");
-        	} else {
-        	    LOG.error("Cannot open channel to server "  + sid);
-        	}
-
-        	if (!queueSendMap.containsKey(sid)) {
-        		queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
-        				CAPACITY));
-        	}
-                    
-        	senderWorkerMap.put(sid, sw);
-        	sw.start();
-        	rw.start();
+        } else {    
+            SendWorker sw = new SendWorker(s, sid);
+            RecvWorker rw = new RecvWorker(s, sid);
+            sw.setRecv(rw);
+
+            SendWorker vsw = senderWorkerMap.get(sid);
+            senderWorkerMap.put(sid, sw);
+            
+            if(vsw != null)
+                vsw.finish();
 
-        	return true;
+            if (!queueSendMap.containsKey(sid)) {
+                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
+                        CAPACITY));
+            }
+            
+            sw.start();
+            rw.start();
+            
+            return true;    
             
         }
         return false;
@@ -225,27 +237,25 @@
             }
         //Otherwise start worker threads to receive data.
         } else {
-        	SendWorker sw = new SendWorker(s, sid);
-        	RecvWorker rw = new RecvWorker(s, sid);
-        	sw.setRecv(rw);
-
-        	if (senderWorkerMap.containsKey(sid)) {
-        	    SendWorker vsw = senderWorkerMap.get(sid);
-        	    if(vsw != null)
-        	        vsw.finish();
-        	    else LOG.error("No SendWorker for this identifier (" + sid + ")");
-        	}
-                    
-        	senderWorkerMap.put(sid, sw);
-                    
-        	if (!queueSendMap.containsKey(sid)) {
-        		queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
-        				CAPACITY));
-        	}      
-        	sw.start();
-        	rw.start();
+            SendWorker sw = new SendWorker(s, sid);
+            RecvWorker rw = new RecvWorker(s, sid);
+            sw.setRecv(rw);
 
-        	return true;    
+            SendWorker vsw = senderWorkerMap.get(sid);
+            senderWorkerMap.put(sid, sw);
+            
+            if(vsw != null)
+                vsw.finish();
+
+            if (!queueSendMap.containsKey(sid)) {
+                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
+                        CAPACITY));
+            }
+            
+            sw.start();
+            rw.start();
+            
+            return true;    
         }
         return false;
     }
@@ -254,7 +264,7 @@
      * Processes invoke this message to queue a message to send. Currently, 
      * only leader election uses it.
      */
-    void toSend(Long sid, ByteBuffer b) {
+    public void toSend(Long sid, ByteBuffer b) {
         /*
          * If sending message to myself, then simply enqueue it (loopback).
          */
@@ -296,7 +306,7 @@
             } catch (InterruptedException e) {
                 LOG.warn("Interrupted while waiting to put message in queue.",
                         e);
-            }
+            } 
     }
     
     /**
@@ -306,9 +316,10 @@
      */
     
     synchronized void connectOne(long sid){
-        if ((senderWorkerMap.get(sid) == null)) {
-            SocketChannel channel;
+        
+        if ((senderWorkerMap.get(sid) == null)){ 
             try {
+                SocketChannel channel;
                 LOG.debug("Opening channel to server "  + sid);
                 channel = SocketChannel
                         .open(self.quorumPeers.get(sid).electionAddr);
@@ -362,6 +373,9 @@
         softHalt();
     }
    
+    /**
+     * A soft halt simply finishes workers.
+     */
     public void softHalt(){
     	for(SendWorker sw: senderWorkerMap.values()){
     		LOG.debug("Halting sender: " + sw);
@@ -372,7 +386,7 @@
     /**
      * Thread to listen on some port
      */
-    class Listener extends Thread {
+    public class Listener extends Thread {
 
         volatile ServerSocketChannel ss = null;
         /**
@@ -380,7 +394,6 @@
          */
         @Override
         public void run() {
-            //ss = null;
             try {
                 ss = ServerSocketChannel.open();
                 int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
@@ -393,10 +406,10 @@
                     Socket sock = client.socket();
                     sock.setTcpNoDelay(true);
                     
-                    LOG.info("Connection request "
+                    LOG.debug("Connection request "
                             + sock.getRemoteSocketAddress());
-                    //synchronized(senderWorkerMap){
-                    LOG.info("Connection request: " + self.getId());
+                    
+                    LOG.debug("Connection request: " + self.getId());
                     receiveConnection(client);
                 }
             } catch (IOException e) {
@@ -404,10 +417,13 @@
             }
         }
         
+        /**
+         * Halts this listener thread.
+         */
         void halt(){
             try{
                 LOG.debug("Trying to close listener: " + ss);
-                if(ss != null)/* && (ss.isOpen()))*/{
+                if(ss != null) {
                     LOG.debug("Closing listener: " + self.getId());
                     ss.close();
                 }
@@ -423,12 +439,18 @@
      * one.
      */
     class SendWorker extends Thread {
-        // Send msgs to peer
         Long sid;
         SocketChannel channel;
         RecvWorker recvWorker;
         volatile boolean running = true;
 
+        /**
+         * An instance of this thread receives messages to send
+         * through a queue and sends them to the server sid.
+         * 
+         * @param channel SocketChannel
+         * @param sid   Server identifier
+         */
         SendWorker(SocketChannel channel, Long sid) {
             this.sid = sid;
             this.channel = channel;
@@ -437,25 +459,63 @@
             LOG.debug("Address of remote peer: " + this.sid);
         }
 
-        void setRecv(RecvWorker recvWorker) {
+        synchronized void setRecv(RecvWorker recvWorker) {
             this.recvWorker = recvWorker;
         }
 
+        /**
+         * Returns RecvWorker that pairs up with this SendWorker.
+         * 
+         * @return RecvWorker 
+         */
+        synchronized RecvWorker getRecvWorker(){
+            return recvWorker;
+        }
+                
         synchronized boolean finish() {
             running = false;
 
             LOG.debug("Calling finish");
             this.interrupt();
+            try{
+                channel.close();
+            } catch (IOException e) {
+                LOG.warn("Exception while closing socket");
+            }
+            //channel = null;
+            
+            this.interrupt();
             if (recvWorker != null)
                 recvWorker.finish();
             senderWorkerMap.remove(sid);
             return running;
         }
+        
+        synchronized void send(ByteBuffer b) throws IOException {
+            byte[] msgBytes = new byte[b.capacity()
+                                       + (Integer.SIZE / 8)];
+            ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+            msgBuffer.putInt(b.capacity());
+
+            msgBuffer.put(b.array(), 0, b.capacity());
+            msgBuffer.position(0);
+            if(channel != null)
+                channel.write(msgBuffer);
+            else
+                throw new IOException("SocketChannel is null");
+        }
 
         @Override
         public void run() {
-
-            while (running && !shutdown) {
+            try{
+                ByteBuffer b = lastMessageSent.get(sid); 
+                if(b != null) send(b);   
+            } catch (IOException e) {
+                LOG.error("Failed to send last message. Shutting down thread.");
+                this.finish();
+            }
+            
+            while (running && !shutdown && channel != null) {
 
                 ByteBuffer b = null;
                 try {
@@ -471,37 +531,15 @@
                             e);
                     continue;
                 }
-
+                
+                if(b != null)
+                    lastMessageSent.put(sid, b);
+                
                 try {
-                    byte[] msgBytes = new byte[b.capacity()
-                            + (Integer.SIZE / 8)];
-                    ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
-                    msgBuffer.putInt(b.capacity());
-
-                    msgBuffer.put(b.array(), 0, b.capacity());
-                    msgBuffer.position(0);
-                    channel.write(msgBuffer);
-
-                } catch (IOException e) {
-                    /*
-                     * If reconnection doesn't work, then put the
-                     * message back to the beginning of the queue and leave.
-                     */
+                    if(b != null) send(b);
+                } catch (Exception e) {
                     LOG.warn("Exception when using channel: " + sid, e);
-                    finish();
-                    recvWorker.finish();
-                    recvWorker = null;
-                    
-                    ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
-                    if(bq != null){
-                        if (bq.size() == 0) {
-                            boolean ret = bq.offer(b);
-                            if (!ret) {
-                                // to appease findbugs
-                                LOG.error("Not able to add to a quue of size 0");
-                            }
-                        }
-                    } else LOG.error("No queue for server " + sid);
+                    this.finish();
                 }
             }
             LOG.warn("Send worker leaving thread");
@@ -521,7 +559,12 @@
             this.sid = sid;
             this.channel = channel;
         }
-
+        
+        /**
+         * Shuts down this worker
+         * 
+         * @return boolean  Value of variable running
+         */
         synchronized boolean finish() {
             running = false;
             this.interrupt();
@@ -533,7 +576,7 @@
             try {
                 byte[] size = new byte[4];
                 ByteBuffer msgLength = ByteBuffer.wrap(size);
-                while (running && !shutdown && channel.isConnected()) {
+                while (running && !shutdown && channel != null) {
                     /**
                      * Reads the first int to determine the length of the
                      * message

Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=800574&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Mon Aug  3 21:27:09 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.Semaphore;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Test;
+
+
+/**
+ * This test uses two mock servers, each running an instance of QuorumCnxManager.
+ * It simulates the situation in which a peer P sends a message to another peer Q 
+ * while Q is trying to open a connection to P. In this test, Q iniates a connection
+ * to P as soon as it receives a message from P, and verifies that it receives a 
+ * copy of the message.
+ * 
+ * This simple tests verifies that the new mechanism that duplicates the last message
+ * sent upon a re-connection works. 
+ *
+ */
+public class CnxManagerTest extends TestCase {
+    protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class);
+    
+    int count;
+    HashMap<Long,QuorumServer> peers;
+    File tmpdir[];
+    int port[];
+    
+    public void setUp() throws Exception {
+        
+        this.count = 3;
+        this.peers = new HashMap<Long,QuorumServer>(count); 
+        tmpdir = new File[count];
+        port = new int[count];
+        
+        for(int i = 0; i < count; i++) {
+            int clientport = PortAssignment.unique();
+            peers.put(Long.valueOf(i),
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                    new InetSocketAddress(PortAssignment.unique())));
+            tmpdir[i] = ClientBase.createTmpDir();
+            port[i] = clientport;
+        }
+    }
+    
+    public void tearDown() {
+        
+    }
+    
+    
+    ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
+        byte requestBytes[] = new byte[28];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);  
+        
+        /*
+         * Building notification packet to send
+         */
+                
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(epoch);
+        
+        return requestBuffer;
+    }
+    
+    class CnxManagerThread extends Thread {
+        
+        CnxManagerThread(){}
+        
+        public void run(){
+            try {
+                QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2);
+                QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+                QuorumCnxManager.Listener listener = cnxManager.listener;
+                if(listener != null){
+                    listener.start();
+                } else {
+                    LOG.error("Null listener when initializing cnx manager");
+                }
+                
+                long sid = 1;
+                cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
+                cnxManager.recvQueue.take();
+                cnxManager.testInitiateConnection(sid);
+            
+                cnxManager.recvQueue.take();
+            } catch (Exception e) {
+                LOG.error("Exception while running mock thread", e);
+                fail("Unexpected exception");
+            }
+        }
+    }
+    
+    @Test
+    public void testCnxManager() throws Exception {
+        Thread thread = new CnxManagerThread();
+        
+        thread.start();
+        
+        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+        QuorumCnxManager.Listener listener = cnxManager.listener;
+        if(listener != null){
+            listener.start();
+        } else {
+            LOG.error("Null listener when initializing cnx manager");
+        }
+            
+        cnxManager.toSend(new Long(0), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
+        cnxManager.recvQueue.take();
+        
+        thread.join(5000);
+        if (thread.isAlive()) {
+            fail("Threads didn't join");
+        }
+    }
+    
+    
+    
+}
\ No newline at end of file