You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2009/08/03 23:27:09 UTC
svn commit: r800574 - in /hadoop/zookeeper/trunk: CHANGES.txt
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Author: mahadev
Date: Mon Aug 3 21:27:09 2009
New Revision: 800574
URL: http://svn.apache.org/viewvc?rev=800574&view=rev
Log:
ZOOKEEPER-481. Add lastMessageSent to QuorumCnxManager. (flavio via mahadev)
Added:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=800574&r1=800573&r2=800574&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Mon Aug 3 21:27:09 2009
@@ -27,6 +27,9 @@
ZOOKEEPER-457. Make ZookeeperMain public, support for HBase (and other)
embedded clients (ryan rawson via phunt)
+ ZOOKEEPER-481. Add lastMessageSent to QuorumCnxManager. (flavio via
+mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to
"socket reuse" and failure to close client (phunt via mahadev)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=800574&r1=800573&r2=800574&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Mon Aug 3 21:27:09 2009
@@ -78,11 +78,12 @@
*/
ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>> queueSendMap;
+ ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
/*
* Reception queue
*/
- ArrayBlockingQueue<Message> recvQueue;
+ public ArrayBlockingQueue<Message> recvQueue;
/*
* Shutdown flag
@@ -93,7 +94,7 @@
/*
* Listener thread
*/
- Listener listener;
+ public Listener listener;
static class Message {
Message(ByteBuffer buffer, long sid) {
@@ -109,6 +110,8 @@
this.recvQueue = new ArrayBlockingQueue<Message>(CAPACITY);
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
+ this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
+
this.self = self;
// Starts listener thread that waits for connection requests
@@ -116,11 +119,25 @@
}
/**
+ * Invokes initiateConnection for testing purposes
+ *
+ * @param sid
+ */
+ public void testInitiateConnection(long sid) throws Exception {
+ SocketChannel channel;
+ LOG.debug("Opening channel to server " + sid);
+ channel = SocketChannel
+ .open(self.quorumPeers.get(sid).electionAddr);
+ channel.socket().setTcpNoDelay(true);
+ initiateConnection(channel, sid);
+ }
+
+ /**
* If this server has initiated the connection, then it gives up on the
* connection if it loses challenge. Otherwise, it keeps the connection.
*/
- boolean initiateConnection(SocketChannel s, Long sid) {
+ public boolean initiateConnection(SocketChannel s, Long sid) {
try {
// Sending id and challenge
byte[] msgBytes = new byte[8];
@@ -144,31 +161,26 @@
+ "reopen connection: ", e);
}
// Otherwise proceed with the connection
- } else {
- SendWorker sw = new SendWorker(s, sid);
- RecvWorker rw = new RecvWorker(s, sid);
- sw.setRecv(rw);
-
- if (senderWorkerMap
- .containsKey(sid)) {
- SendWorker vsw = senderWorkerMap.get(sid);
- if(vsw != null)
- vsw.finish();
- else LOG.error("No SendWorker for this identifier (" + sid + ")");
- } else {
- LOG.error("Cannot open channel to server " + sid);
- }
-
- if (!queueSendMap.containsKey(sid)) {
- queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
- CAPACITY));
- }
-
- senderWorkerMap.put(sid, sw);
- sw.start();
- rw.start();
+ } else {
+ SendWorker sw = new SendWorker(s, sid);
+ RecvWorker rw = new RecvWorker(s, sid);
+ sw.setRecv(rw);
+
+ SendWorker vsw = senderWorkerMap.get(sid);
+ senderWorkerMap.put(sid, sw);
+
+ if(vsw != null)
+ vsw.finish();
- return true;
+ if (!queueSendMap.containsKey(sid)) {
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
+ CAPACITY));
+ }
+
+ sw.start();
+ rw.start();
+
+ return true;
}
return false;
@@ -225,27 +237,25 @@
}
//Otherwise start worker threads to receive data.
} else {
- SendWorker sw = new SendWorker(s, sid);
- RecvWorker rw = new RecvWorker(s, sid);
- sw.setRecv(rw);
-
- if (senderWorkerMap.containsKey(sid)) {
- SendWorker vsw = senderWorkerMap.get(sid);
- if(vsw != null)
- vsw.finish();
- else LOG.error("No SendWorker for this identifier (" + sid + ")");
- }
-
- senderWorkerMap.put(sid, sw);
-
- if (!queueSendMap.containsKey(sid)) {
- queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
- CAPACITY));
- }
- sw.start();
- rw.start();
+ SendWorker sw = new SendWorker(s, sid);
+ RecvWorker rw = new RecvWorker(s, sid);
+ sw.setRecv(rw);
- return true;
+ SendWorker vsw = senderWorkerMap.get(sid);
+ senderWorkerMap.put(sid, sw);
+
+ if(vsw != null)
+ vsw.finish();
+
+ if (!queueSendMap.containsKey(sid)) {
+ queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
+ CAPACITY));
+ }
+
+ sw.start();
+ rw.start();
+
+ return true;
}
return false;
}
@@ -254,7 +264,7 @@
* Processes invoke this message to queue a message to send. Currently,
* only leader election uses it.
*/
- void toSend(Long sid, ByteBuffer b) {
+ public void toSend(Long sid, ByteBuffer b) {
/*
* If sending message to myself, then simply enqueue it (loopback).
*/
@@ -296,7 +306,7 @@
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting to put message in queue.",
e);
- }
+ }
}
/**
@@ -306,9 +316,10 @@
*/
synchronized void connectOne(long sid){
- if ((senderWorkerMap.get(sid) == null)) {
- SocketChannel channel;
+
+ if ((senderWorkerMap.get(sid) == null)){
try {
+ SocketChannel channel;
LOG.debug("Opening channel to server " + sid);
channel = SocketChannel
.open(self.quorumPeers.get(sid).electionAddr);
@@ -362,6 +373,9 @@
softHalt();
}
+ /**
+ * A soft halt simply finishes workers.
+ */
public void softHalt(){
for(SendWorker sw: senderWorkerMap.values()){
LOG.debug("Halting sender: " + sw);
@@ -372,7 +386,7 @@
/**
* Thread to listen on some port
*/
- class Listener extends Thread {
+ public class Listener extends Thread {
volatile ServerSocketChannel ss = null;
/**
@@ -380,7 +394,6 @@
*/
@Override
public void run() {
- //ss = null;
try {
ss = ServerSocketChannel.open();
int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
@@ -393,10 +406,10 @@
Socket sock = client.socket();
sock.setTcpNoDelay(true);
- LOG.info("Connection request "
+ LOG.debug("Connection request "
+ sock.getRemoteSocketAddress());
- //synchronized(senderWorkerMap){
- LOG.info("Connection request: " + self.getId());
+
+ LOG.debug("Connection request: " + self.getId());
receiveConnection(client);
}
} catch (IOException e) {
@@ -404,10 +417,13 @@
}
}
+ /**
+ * Halts this listener thread.
+ */
void halt(){
try{
LOG.debug("Trying to close listener: " + ss);
- if(ss != null)/* && (ss.isOpen()))*/{
+ if(ss != null) {
LOG.debug("Closing listener: " + self.getId());
ss.close();
}
@@ -423,12 +439,18 @@
* one.
*/
class SendWorker extends Thread {
- // Send msgs to peer
Long sid;
SocketChannel channel;
RecvWorker recvWorker;
volatile boolean running = true;
+ /**
+ * An instance of this thread receives messages to send
+ * through a queue and sends them to the server sid.
+ *
+ * @param channel SocketChannel
+ * @param sid Server identifier
+ */
SendWorker(SocketChannel channel, Long sid) {
this.sid = sid;
this.channel = channel;
@@ -437,25 +459,63 @@
LOG.debug("Address of remote peer: " + this.sid);
}
- void setRecv(RecvWorker recvWorker) {
+ synchronized void setRecv(RecvWorker recvWorker) {
this.recvWorker = recvWorker;
}
+ /**
+ * Returns RecvWorker that pairs up with this SendWorker.
+ *
+ * @return RecvWorker
+ */
+ synchronized RecvWorker getRecvWorker(){
+ return recvWorker;
+ }
+
synchronized boolean finish() {
running = false;
LOG.debug("Calling finish");
this.interrupt();
+ try{
+ channel.close();
+ } catch (IOException e) {
+ LOG.warn("Exception while closing socket");
+ }
+ //channel = null;
+
+ this.interrupt();
if (recvWorker != null)
recvWorker.finish();
senderWorkerMap.remove(sid);
return running;
}
+
+ synchronized void send(ByteBuffer b) throws IOException {
+ byte[] msgBytes = new byte[b.capacity()
+ + (Integer.SIZE / 8)];
+ ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
+ msgBuffer.putInt(b.capacity());
+
+ msgBuffer.put(b.array(), 0, b.capacity());
+ msgBuffer.position(0);
+ if(channel != null)
+ channel.write(msgBuffer);
+ else
+ throw new IOException("SocketChannel is null");
+ }
@Override
public void run() {
-
- while (running && !shutdown) {
+ try{
+ ByteBuffer b = lastMessageSent.get(sid);
+ if(b != null) send(b);
+ } catch (IOException e) {
+ LOG.error("Failed to send last message. Shutting down thread.");
+ this.finish();
+ }
+
+ while (running && !shutdown && channel != null) {
ByteBuffer b = null;
try {
@@ -471,37 +531,15 @@
e);
continue;
}
-
+
+ if(b != null)
+ lastMessageSent.put(sid, b);
+
try {
- byte[] msgBytes = new byte[b.capacity()
- + (Integer.SIZE / 8)];
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
- msgBuffer.putInt(b.capacity());
-
- msgBuffer.put(b.array(), 0, b.capacity());
- msgBuffer.position(0);
- channel.write(msgBuffer);
-
- } catch (IOException e) {
- /*
- * If reconnection doesn't work, then put the
- * message back to the beginning of the queue and leave.
- */
+ if(b != null) send(b);
+ } catch (Exception e) {
LOG.warn("Exception when using channel: " + sid, e);
- finish();
- recvWorker.finish();
- recvWorker = null;
-
- ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);
- if(bq != null){
- if (bq.size() == 0) {
- boolean ret = bq.offer(b);
- if (!ret) {
- // to appease findbugs
- LOG.error("Not able to add to a quue of size 0");
- }
- }
- } else LOG.error("No queue for server " + sid);
+ this.finish();
}
}
LOG.warn("Send worker leaving thread");
@@ -521,7 +559,12 @@
this.sid = sid;
this.channel = channel;
}
-
+
+ /**
+ * Shuts down this worker
+ *
+ * @return boolean Value of variable running
+ */
synchronized boolean finish() {
running = false;
this.interrupt();
@@ -533,7 +576,7 @@
try {
byte[] size = new byte[4];
ByteBuffer msgLength = ByteBuffer.wrap(size);
- while (running && !shutdown && channel.isConnected()) {
+ while (running && !shutdown && channel != null) {
/**
* Reads the first int to determine the length of the
* message
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=800574&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Mon Aug 3 21:27:09 2009
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.concurrent.Semaphore;
+
+import junit.framework.TestCase;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
+import org.apache.zookeeper.server.quorum.QuorumCnxManager;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.Vote;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.junit.Test;
+
+
+/**
+ * This test uses two mock servers, each running an instance of QuorumCnxManager.
+ * It simulates the situation in which a peer P sends a message to another peer Q
+ * while Q is trying to open a connection to P. In this test, Q iniates a connection
+ * to P as soon as it receives a message from P, and verifies that it receives a
+ * copy of the message.
+ *
+ * This simple tests verifies that the new mechanism that duplicates the last message
+ * sent upon a re-connection works.
+ *
+ */
+public class CnxManagerTest extends TestCase {
+ protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class);
+
+ int count;
+ HashMap<Long,QuorumServer> peers;
+ File tmpdir[];
+ int port[];
+
+ public void setUp() throws Exception {
+
+ this.count = 3;
+ this.peers = new HashMap<Long,QuorumServer>(count);
+ tmpdir = new File[count];
+ port = new int[count];
+
+ for(int i = 0; i < count; i++) {
+ int clientport = PortAssignment.unique();
+ peers.put(Long.valueOf(i),
+ new QuorumServer(i,
+ new InetSocketAddress(clientport),
+ new InetSocketAddress(PortAssignment.unique())));
+ tmpdir[i] = ClientBase.createTmpDir();
+ port[i] = clientport;
+ }
+ }
+
+ public void tearDown() {
+
+ }
+
+
+ ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
+ byte requestBytes[] = new byte[28];
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+ /*
+ * Building notification packet to send
+ */
+
+ requestBuffer.clear();
+ requestBuffer.putInt(state);
+ requestBuffer.putLong(leader);
+ requestBuffer.putLong(zxid);
+ requestBuffer.putLong(epoch);
+
+ return requestBuffer;
+ }
+
+ class CnxManagerThread extends Thread {
+
+ CnxManagerThread(){}
+
+ public void run(){
+ try {
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2);
+ QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+ QuorumCnxManager.Listener listener = cnxManager.listener;
+ if(listener != null){
+ listener.start();
+ } else {
+ LOG.error("Null listener when initializing cnx manager");
+ }
+
+ long sid = 1;
+ cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
+ cnxManager.recvQueue.take();
+ cnxManager.testInitiateConnection(sid);
+
+ cnxManager.recvQueue.take();
+ } catch (Exception e) {
+ LOG.error("Exception while running mock thread", e);
+ fail("Unexpected exception");
+ }
+ }
+ }
+
+ @Test
+ public void testCnxManager() throws Exception {
+ Thread thread = new CnxManagerThread();
+
+ thread.start();
+
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+ QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
+ QuorumCnxManager.Listener listener = cnxManager.listener;
+ if(listener != null){
+ listener.start();
+ } else {
+ LOG.error("Null listener when initializing cnx manager");
+ }
+
+ cnxManager.toSend(new Long(0), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
+ cnxManager.recvQueue.take();
+
+ thread.join(5000);
+ if (thread.isAlive()) {
+ fail("Threads didn't join");
+ }
+ }
+
+
+
+}
\ No newline at end of file