You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2012/12/19 19:07:14 UTC
svn commit: r1423990 [1/2] - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/
src/java/test/org/apache/zookeeper/test/
Author: phunt
Date: Wed Dec 19 18:07:14 2012
New Revision: 1423990
URL: http://svn.apache.org/viewvc?rev=1423990&view=rev
Log:
ZOOKEEPER-1504. Multi-thread NIOServerCnxn (Jay Shrauner via phunt)
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ServerCnxnTest.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1423990&r1=1423989&r2=1423990&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Dec 19 18:07:14 2012
@@ -450,6 +450,8 @@ IMPROVEMENTS:
ZOOKEEPER-1335. Add support for --config to zkEnv.sh to specify a config
directory different than what is expected (Arpit Gupta via mahadev)
+ ZOOKEEPER-1504. Multi-thread NIOServerCnxn (Jay Shrauner via phunt)
+
Release 3.4.0 -
Non-backward compatible changes:
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java?rev=1423990&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ExpiryQueue.java Wed Dec 19 18:07:14 2012
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * ExpiryQueue tracks elements in time sorted fixed duration buckets.
+ * It's used by SessionTrackerImpl to expire sessions and NIOServerCnxnFactory
+ * to expire connections.
+ */
+public class ExpiryQueue<E> {
+ private final ConcurrentHashMap<E, Long> elemMap =
+ new ConcurrentHashMap<E, Long>();
+ /**
+ * The maximum number of buckets is equal to max timeout/expirationInterval,
+ * so the expirationInterval should not be too small compared to the
+ * max timeout that this expiry queue needs to maintain.
+ */
+ private final ConcurrentHashMap<Long, Set<E>> expiryMap =
+ new ConcurrentHashMap<Long, Set<E>>();
+
+ private final AtomicLong nextExpirationTime = new AtomicLong();
+ private final int expirationInterval;
+
+ public ExpiryQueue(int expirationInterval) {
+ this.expirationInterval = expirationInterval;
+ nextExpirationTime.set(roundToNextInterval(System.currentTimeMillis()));
+ }
+
+ private long roundToNextInterval(long time) {
+ return (time / expirationInterval + 1) * expirationInterval;
+ }
+
+ /**
+ * Removes element from the queue.
+ * @param elem element to remove
+ * @return time at which the element was set to expire, or null if
+ * it wasn't present
+ */
+ public Long remove(E elem) {
+ Long expiryTime = elemMap.remove(elem);
+ if (expiryTime != null) {
+ Set<E> set = expiryMap.get(expiryTime);
+ if (set != null) {
+ set.remove(elem);
+ // We don't need to worry about removing empty sets,
+ // they'll eventually be removed when they expire.
+ }
+ }
+ return expiryTime;
+ }
+
+ /**
+ * Adds or updates expiration time for element in queue, rounding the
+ * timeout to the expiry interval bucketed used by this queue.
+ * @param elem element to add/update
+ * @param timeout timout in milliseconds
+ * @return time at which the element is now set to expire if
+ * changed, or null if unchanged
+ */
+ public Long update(E elem, int timeout) {
+ Long prevExpiryTime = elemMap.get(elem);
+ long now = System.currentTimeMillis();
+ Long newExpiryTime = roundToNextInterval(now + timeout);
+
+ if (newExpiryTime.equals(prevExpiryTime)) {
+ // No change, so nothing to update
+ return null;
+ }
+
+ // First add the elem to the new expiry time bucket in expiryMap.
+ Set<E> set = expiryMap.get(newExpiryTime);
+ if (set == null) {
+ // Construct a ConcurrentHashSet using a ConcurrentHashMap
+ set = Collections.newSetFromMap(
+ new ConcurrentHashMap<E, Boolean>());
+ // Put the new set in the map, but only if another thread
+ // hasn't beaten us to it
+ Set<E> existingSet = expiryMap.putIfAbsent(newExpiryTime, set);
+ if (existingSet != null) {
+ set = existingSet;
+ }
+ }
+ set.add(elem);
+
+ // Map the elem to the new expiry time. If a different previous
+ // mapping was present, clean up the previous expiry bucket.
+ prevExpiryTime = elemMap.put(elem, newExpiryTime);
+ if (prevExpiryTime != null && !newExpiryTime.equals(prevExpiryTime)) {
+ Set<E> prevSet = expiryMap.get(prevExpiryTime);
+ if (prevSet != null) {
+ prevSet.remove(elem);
+ }
+ }
+ return newExpiryTime;
+ }
+
+ /**
+ * @return milliseconds until next expiration time, or 0 if has already past
+ */
+ public long getWaitTime() {
+ long now = System.currentTimeMillis();
+ long expirationTime = nextExpirationTime.get();
+ return now < expirationTime ? (expirationTime - now) : 0L;
+ }
+
+ /**
+ * Remove the next expired set of elements from expireMap. This method needs
+ * to be called frequently enough by checking getWaitTime(), otherwise there
+ * will be a backlog of empty sets queued up in expiryMap.
+ *
+ * @return next set of expired elements, or an empty set if none are
+ * ready
+ */
+ public Set<E> poll() {
+ long now = System.currentTimeMillis();
+ long expirationTime = nextExpirationTime.get();
+ if (now < expirationTime) {
+ return Collections.emptySet();
+ }
+
+ Set<E> set = null;
+ long newExpirationTime = expirationTime + expirationInterval;
+ if (nextExpirationTime.compareAndSet(
+ expirationTime, newExpirationTime)) {
+ set = expiryMap.remove(expirationTime);
+ }
+ if (set == null) {
+ return Collections.emptySet();
+ }
+ return set;
+ }
+
+ public void dump(PrintWriter pwriter) {
+ pwriter.print("Sets (");
+ pwriter.print(expiryMap.size());
+ pwriter.print(")/(");
+ pwriter.print(elemMap.size());
+ pwriter.println("):");
+ ArrayList<Long> keys = new ArrayList<Long>(expiryMap.keySet());
+ Collections.sort(keys);
+ for (long time : keys) {
+ Set<E> set = expiryMap.get(time);
+ if (set != null) {
+ pwriter.print(set.size());
+ pwriter.print(" expire at ");
+ pwriter.print(new Date(time));
+ pwriter.println(":");
+ for (E elem : set) {
+ pwriter.print("\t");
+ pwriter.println(elem.toString());
+ }
+ }
+ }
+ }
+}
+
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=1423990&r1=1423989&r2=1423990&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Wed Dec 19 18:07:14 2012
@@ -29,9 +29,11 @@ import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
-import java.util.HashSet;
import java.util.List;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.jute.BinaryInputArchive;
@@ -46,6 +48,7 @@ import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.WatcherEvent;
+import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
@@ -58,58 +61,63 @@ import org.apache.zookeeper.server.util.
public class NIOServerCnxn extends ServerCnxn {
static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxn.class);
- NIOServerCnxnFactory factory;
+ private final NIOServerCnxnFactory factory;
- SocketChannel sock;
+ private SocketChannel sock;
+
+ private final SelectorThread selectorThread;
private final SelectionKey sk;
- boolean initialized;
+ private boolean initialized;
- ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+ private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
- ByteBuffer incomingBuffer = lenBuffer;
+ private ByteBuffer incomingBuffer = lenBuffer;
- LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
+ private final Queue<ByteBuffer> outgoingBuffers =
+ new LinkedBlockingQueue<ByteBuffer>();
- int sessionTimeout;
+ private int sessionTimeout;
private final ZooKeeperServer zkServer;
/**
* The number of requests that have been submitted but not yet responded to.
*/
- int outstandingRequests;
+ private final AtomicInteger outstandingRequests = new AtomicInteger(0);
/**
* This is the id that uniquely identifies the session of a client. Once
* this session is no longer active, the ephemeral nodes will go away.
*/
- long sessionId;
+ private long sessionId;
- static long nextSessionId = 1;
- int outstandingLimit = 1;
+ private final int outstandingLimit;
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
- SelectionKey sk, NIOServerCnxnFactory factory) throws IOException {
+ SelectionKey sk, NIOServerCnxnFactory factory,
+ SelectorThread selectorThread) throws IOException {
this.zkServer = zk;
this.sock = sock;
this.sk = sk;
this.factory = factory;
+ this.selectorThread = selectorThread;
if (this.factory.login != null) {
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
}
- if (zk != null) {
+ if (zk != null) {
outstandingLimit = zk.getGlobalOutstandingLimit();
+ } else {
+ outstandingLimit = 1;
}
sock.socket().setTcpNoDelay(true);
- /* set socket linger to false, so that socket close does not
- * block */
+ /* set socket linger to false, so that socket close does not block */
sock.socket().setSoLinger(false, -1);
InetAddress addr = ((InetSocketAddress) sock.socket()
.getRemoteSocketAddress()).getAddress();
authInfo.add(new Id("ip", addr.getHostAddress()));
- sk.interestOps(SelectionKey.OP_READ);
+ this.sessionTimeout = factory.sessionlessCnxnTimeout;
}
/* Send close connection packet to the client, doIO will eventually
@@ -127,56 +135,32 @@ public class NIOServerCnxn extends Serve
void sendBufferSync(ByteBuffer bb) {
try {
/* configure socket to be blocking
- * so that we dont have to do write in
+ * so that we dont have to do write in
* a tight while loop
*/
- sock.configureBlocking(true);
if (bb != ServerCnxnFactory.closeConn) {
if (sock != null) {
+ sock.configureBlocking(true);
sock.write(bb);
}
packetSent();
- }
+ }
} catch (IOException ie) {
LOG.error("Error sending data synchronously ", ie);
}
}
-
- public void sendBuffer(ByteBuffer bb) {
- try {
- if (bb != ServerCnxnFactory.closeConn) {
- // We check if write interest here because if it is NOT set,
- // nothing is queued, so we can try to send the buffer right
- // away without waking up the selector
- if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
- try {
- sock.write(bb);
- } catch (IOException e) {
- // we are just doing best effort right now
- }
- }
- // if there is nothing left to send, we are done
- if (bb.remaining() == 0) {
- packetSent();
- return;
- }
- }
- synchronized(this.factory){
- sk.selector().wakeup();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
- + " is valid: " + sk.isValid());
- }
- outgoingBuffers.add(bb);
- if (sk.isValid()) {
- sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
- }
- }
-
- } catch(Exception e) {
- LOG.error("Unexpected Exception: ", e);
+ /**
+ * sendBuffer pushes a byte buffer onto the outgoing buffer queue for
+ * asynchronous writes.
+ */
+ public void sendBuffer(ByteBuffer bb) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
+ + " is valid: " + sk.isValid());
}
+ outgoingBuffers.add(bb);
+ requestInterestOpsUpdate();
}
/** Read the request payload (everything following the length prefix) */
@@ -204,6 +188,123 @@ public class NIOServerCnxn extends Serve
}
}
+ /**
+ * This boolean tracks whether the connection is ready for selection or
+ * not. A connection is marked as not ready for selection while it is
+ * processing an IO request. The flag is used to gatekeep pushing interest
+ * op updates onto the selector.
+ */
+ private final AtomicBoolean selectable = new AtomicBoolean(true);
+
+ public boolean isSelectable() {
+ return sk.isValid() && selectable.get();
+ }
+
+ public void disableSelectable() {
+ selectable.set(false);
+ }
+
+ public void enableSelectable() {
+ selectable.set(true);
+ }
+
+ private void requestInterestOpsUpdate() {
+ if (isSelectable()) {
+ selectorThread.addInterestOpsUpdateRequest(sk);
+ }
+ }
+
+ void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
+ if (outgoingBuffers.isEmpty()) {
+ return;
+ }
+
+ /*
+ * This is going to reset the buffer position to 0 and the
+ * limit to the size of the buffer, so that we can fill it
+ * with data from the non-direct buffers that we need to
+ * send.
+ */
+ ByteBuffer directBuffer = NIOServerCnxnFactory.getDirectBuffer();
+ if (directBuffer == null) {
+ ByteBuffer[] bufferList = new ByteBuffer[outgoingBuffers.size()];
+ // Use gathered write call. This updates the positions of the
+ // byte buffers to reflect the bytes that were written out.
+ sock.write(outgoingBuffers.toArray(bufferList));
+
+ // Remove the buffers that we have sent
+ ByteBuffer bb;
+ while ((bb = outgoingBuffers.peek()) != null) {
+ if (bb == ServerCnxnFactory.closeConn) {
+ throw new CloseRequestException("close requested");
+ }
+ if (bb.remaining() > 0) {
+ break;
+ }
+ packetSent();
+ outgoingBuffers.remove();
+ }
+ } else {
+ directBuffer.clear();
+
+ for (ByteBuffer b : outgoingBuffers) {
+ if (directBuffer.remaining() < b.remaining()) {
+ /*
+ * When we call put later, if the directBuffer is to
+ * small to hold everything, nothing will be copied,
+ * so we've got to slice the buffer if it's too big.
+ */
+ b = (ByteBuffer) b.slice().limit(
+ directBuffer.remaining());
+ }
+ /*
+ * put() is going to modify the positions of both
+ * buffers, put we don't want to change the position of
+ * the source buffers (we'll do that after the send, if
+ * needed), so we save and reset the position after the
+ * copy
+ */
+ int p = b.position();
+ directBuffer.put(b);
+ b.position(p);
+ if (directBuffer.remaining() == 0) {
+ break;
+ }
+ }
+ /*
+ * Do the flip: limit becomes position, position gets set to
+ * 0. This sets us up for the write.
+ */
+ directBuffer.flip();
+
+ int sent = sock.write(directBuffer);
+
+ ByteBuffer bb;
+
+ // Remove the buffers that we have sent
+ while ((bb = outgoingBuffers.peek()) != null) {
+ if (bb == ServerCnxnFactory.closeConn) {
+ throw new CloseRequestException("close requested");
+ }
+ if (sent < bb.remaining()) {
+ /*
+ * We only partially sent this buffer, so we update
+ * the position and exit the loop.
+ */
+ bb.position(bb.position() + sent);
+ break;
+ }
+ packetSent();
+ /* We've sent the whole buffer, so drop the buffer */
+ sent -= bb.remaining();
+ outgoingBuffers.remove();
+ }
+ }
+ }
+
+ /**
+ * Handles read/write IO on connection.
+ */
void doIO(SelectionKey k) throws InterruptedException {
try {
if (sock == null) {
@@ -241,101 +342,15 @@ public class NIOServerCnxn extends Serve
}
}
if (k.isWritable()) {
- // ZooLog.logTraceMessage(LOG,
- // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
- // "outgoingBuffers.size() = " +
- // outgoingBuffers.size());
- if (outgoingBuffers.size() > 0) {
- // ZooLog.logTraceMessage(LOG,
- // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
- // "sk " + k + " is valid: " +
- // k.isValid());
-
- /*
- * This is going to reset the buffer position to 0 and the
- * limit to the size of the buffer, so that we can fill it
- * with data from the non-direct buffers that we need to
- * send.
- */
- ByteBuffer directBuffer = factory.directBuffer;
- directBuffer.clear();
+ handleWrite(k);
- for (ByteBuffer b : outgoingBuffers) {
- if (directBuffer.remaining() < b.remaining()) {
- /*
- * When we call put later, if the directBuffer is to
- * small to hold everything, nothing will be copied,
- * so we've got to slice the buffer if it's too big.
- */
- b = (ByteBuffer) b.slice().limit(
- directBuffer.remaining());
- }
- /*
- * put() is going to modify the positions of both
- * buffers, put we don't want to change the position of
- * the source buffers (we'll do that after the send, if
- * needed), so we save and reset the position after the
- * copy
- */
- int p = b.position();
- directBuffer.put(b);
- b.position(p);
- if (directBuffer.remaining() == 0) {
- break;
- }
- }
- /*
- * Do the flip: limit becomes position, position gets set to
- * 0. This sets us up for the write.
- */
- directBuffer.flip();
-
- int sent = sock.write(directBuffer);
- ByteBuffer bb;
-
- // Remove the buffers that we have sent
- while (outgoingBuffers.size() > 0) {
- bb = outgoingBuffers.peek();
- if (bb == ServerCnxnFactory.closeConn) {
- throw new CloseRequestException("close requested");
- }
- int left = bb.remaining() - sent;
- if (left > 0) {
- /*
- * We only partially sent this buffer, so we update
- * the position and exit the loop.
- */
- bb.position(bb.position() + sent);
- break;
- }
- packetSent();
- /* We've sent the whole buffer, so drop the buffer */
- sent -= bb.remaining();
- outgoingBuffers.remove();
- }
- // ZooLog.logTraceMessage(LOG,
- // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
- // outgoingBuffers.size() = " + outgoingBuffers.size());
- }
-
- synchronized(this.factory){
- if (outgoingBuffers.size() == 0) {
- if (!initialized
- && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
- throw new CloseRequestException("responded to info probe");
- }
- sk.interestOps(sk.interestOps()
- & (~SelectionKey.OP_WRITE));
- } else {
- sk.interestOps(sk.interestOps()
- | SelectionKey.OP_WRITE);
- }
+ if (!initialized && !getReadInterest() && !getWriteInterest()) {
+ throw new CloseRequestException("responded to info probe");
}
}
} catch (CancelledKeyException e) {
- LOG.warn("Exception causing close of session 0x"
- + Long.toHexString(sessionId)
- + " due to " + e);
+ LOG.warn("CancelledKeyException causing close of session 0x"
+ + Long.toHexString(sessionId));
if (LOG.isDebugEnabled()) {
LOG.debug("CancelledKeyException stack trace", e);
}
@@ -344,14 +359,12 @@ public class NIOServerCnxn extends Serve
// expecting close to log session closure
close();
} catch (EndOfStreamException e) {
- LOG.warn("caught end of stream exception",e); // tell user why
-
+ LOG.warn(e.getMessage());
// expecting close to log session closure
close();
} catch (IOException e) {
LOG.warn("Exception causing close of session 0x"
- + Long.toHexString(sessionId)
- + " due to " + e);
+ + Long.toHexString(sessionId) + ": " + e.getMessage());
if (LOG.isDebugEnabled()) {
LOG.debug("IOException stack trace", e);
}
@@ -362,42 +375,50 @@ public class NIOServerCnxn extends Serve
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
-
+
+ // Only called as callback from zkServer.processPacket()
protected void incrOutstandingRequests(RequestHeader h) {
if (h.getXid() >= 0) {
- synchronized (this) {
- outstandingRequests++;
- }
- synchronized (this.factory) {
- // check throttling
- if (zkServer.getInProcess() > outstandingLimit) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Throttling recv " + zkServer.getInProcess());
- }
- disableRecv();
- // following lines should not be needed since we are
- // already reading
- // } else {
- // enableRecv();
+ outstandingRequests.incrementAndGet();
+ // check throttling
+ int inProcess = zkServer.getInProcess();
+ if (inProcess > outstandingLimit) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Throttling recv " + inProcess);
}
+ disableRecv();
}
}
+ }
+ // returns whether we are interested in writing, which is determined
+ // by whether we have any pending buffers on the output queue or not
+ private boolean getWriteInterest() {
+ return !outgoingBuffers.isEmpty();
}
+ // returns whether we are interested in taking new requests, which is
+ // determined by whether we are currently throttled or not
+ private boolean getReadInterest() {
+ return !throttled.get();
+ }
+
+ private final AtomicBoolean throttled = new AtomicBoolean(false);
+
+ // Throttle acceptance of new requests. If this entailed a state change,
+ // register an interest op update request with the selector.
public void disableRecv() {
- sk.interestOps(sk.interestOps() & (~SelectionKey.OP_READ));
+ if (throttled.compareAndSet(false, true)) {
+ requestInterestOpsUpdate();
+ }
}
+ // Disable throttling and resume acceptance of new requests. If this
+ // entailed a state change, register an interest op update request with
+ // the selector.
public void enableRecv() {
- synchronized (this.factory) {
- sk.selector().wakeup();
- if (sk.isValid()) {
- int interest = sk.interestOps();
- if ((interest & SelectionKey.OP_READ) == 0) {
- sk.interestOps(interest | SelectionKey.OP_READ);
- }
- }
+ if (throttled.compareAndSet(true, false)) {
+ requestInterestOpsUpdate();
}
}
@@ -412,7 +433,7 @@ public class NIOServerCnxn extends Serve
/**
* clean up the socket related to a command and also make sure we flush the
* data before we do that
- *
+ *
* @param pwriter
* the pwriter for a command socket
*/
@@ -432,7 +453,7 @@ public class NIOServerCnxn extends Serve
}
}
}
-
+
/**
* This class wraps the sendBuffer method of NIOServerCnxn. It is
* responsible for chunking up the response to a client. Rather
@@ -441,7 +462,7 @@ public class NIOServerCnxn extends Serve
*/
private class SendBufferWriter extends Writer {
private StringBuffer sb = new StringBuffer();
-
+
/**
* Check if we are ready to send another chunk.
* @param force force sending, even if not a full chunk
@@ -475,20 +496,24 @@ public class NIOServerCnxn extends Serve
private static final String ZK_NOT_SERVING =
"This ZooKeeper instance is not currently serving requests";
-
+
/**
* Set of threads for commmand ports. All the 4
* letter commands are run via a thread. Each class
* maps to a corresponding 4 letter command. CommandThread
* is the abstract class from which all the others inherit.
*/
- private abstract class CommandThread extends Thread {
+ private abstract class CommandThread {
PrintWriter pw;
-
+
CommandThread(PrintWriter pw) {
this.pw = pw;
}
-
+
+ public void start() {
+ run();
+ }
+
public void run() {
try {
commandRun();
@@ -498,52 +523,52 @@ public class NIOServerCnxn extends Serve
cleanupWriterSocket(pw);
}
}
-
+
public abstract void commandRun() throws IOException;
}
-
+
private class RuokCommand extends CommandThread {
public RuokCommand(PrintWriter pw) {
super(pw);
}
-
+
@Override
public void commandRun() {
pw.print("imok");
-
+
}
}
-
+
private class TraceMaskCommand extends CommandThread {
TraceMaskCommand(PrintWriter pw) {
super(pw);
}
-
+
@Override
public void commandRun() {
long traceMask = ZooTrace.getTextTraceLevel();
pw.print(traceMask);
}
}
-
+
private class SetTraceMaskCommand extends CommandThread {
long trace = 0;
SetTraceMaskCommand(PrintWriter pw, long trace) {
super(pw);
this.trace = trace;
}
-
+
@Override
public void commandRun() {
pw.print(trace);
}
}
-
+
private class EnvCommand extends CommandThread {
EnvCommand(PrintWriter pw) {
super(pw);
}
-
+
@Override
public void commandRun() {
List<Environment.Entry> env = Environment.list();
@@ -554,15 +579,15 @@ public class NIOServerCnxn extends Serve
pw.print("=");
pw.println(e.getValue());
}
-
- }
+
+ }
}
-
+
private class ConfCommand extends CommandThread {
ConfCommand(PrintWriter pw) {
super(pw);
}
-
+
@Override
public void commandRun() {
if (zkServer == null) {
@@ -572,38 +597,36 @@ public class NIOServerCnxn extends Serve
}
}
}
-
+
private class StatResetCommand extends CommandThread {
public StatResetCommand(PrintWriter pw) {
super(pw);
}
-
+
@Override
public void commandRun() {
if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
}
- else {
+ else {
zkServer.serverStats().reset();
pw.println("Server stats reset.");
}
}
}
-
+
private class CnxnStatResetCommand extends CommandThread {
public CnxnStatResetCommand(PrintWriter pw) {
super(pw);
}
-
+
@Override
public void commandRun() {
if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
} else {
- synchronized(factory.cnxns){
- for(ServerCnxn c : factory.cnxns){
- c.resetStats();
- }
+ for(ServerCnxn c : factory.cnxns){
+ c.resetStats();
}
pw.println("Connection stats reset.");
}
@@ -614,7 +637,7 @@ public class NIOServerCnxn extends Serve
public DumpCommand(PrintWriter pw) {
super(pw);
}
-
+
@Override
public void commandRun() {
if (zkServer == null) {
@@ -625,24 +648,26 @@ public class NIOServerCnxn extends Serve
zkServer.sessionTracker.dumpSessions(pw);
pw.println("ephemeral nodes dump:");
zkServer.dumpEphemerals(pw);
+ pw.println("Connections dump:");
+ factory.dumpConnections(pw);
}
}
}
-
+
private class StatCommand extends CommandThread {
int len;
public StatCommand(PrintWriter pw, int len) {
super(pw);
this.len = len;
}
-
+
@SuppressWarnings("unchecked")
@Override
public void commandRun() {
if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
}
- else {
+ else {
pw.print("Zookeeper version: ");
pw.println(Version.getFullVersion());
if (zkServer instanceof ReadOnlyZooKeeperServer) {
@@ -652,14 +677,7 @@ public class NIOServerCnxn extends Serve
if (len == statCmd) {
LOG.info("Stat command output");
pw.println("Clients:");
- // clone should be faster than iteration
- // ie give up the cnxns lock faster
- HashSet<NIOServerCnxn> cnxnset;
- synchronized(factory.cnxns){
- cnxnset = (HashSet<NIOServerCnxn>)factory
- .cnxns.clone();
- }
- for(NIOServerCnxn c : cnxnset){
+ for(ServerCnxn c : factory.cnxns){
c.dumpConnectionInfo(pw, true);
pw.println();
}
@@ -669,28 +687,22 @@ public class NIOServerCnxn extends Serve
pw.print("Node count: ");
pw.println(zkServer.getZKDatabase().getNodeCount());
}
-
+
}
}
-
+
private class ConsCommand extends CommandThread {
public ConsCommand(PrintWriter pw) {
super(pw);
}
-
+
@SuppressWarnings("unchecked")
@Override
public void commandRun() {
if (zkServer == null) {
pw.println(ZK_NOT_SERVING);
} else {
- // clone should be faster than iteration
- // ie give up the cnxns lock faster
- HashSet<NIOServerCnxn> cnxns;
- synchronized (factory.cnxns) {
- cnxns = (HashSet<NIOServerCnxn>) factory.cnxns.clone();
- }
- for (NIOServerCnxn c : cnxns) {
+ for (ServerCnxn c : factory.cnxns) {
c.dumpConnectionInfo(pw, false);
pw.println();
}
@@ -698,7 +710,7 @@ public class NIOServerCnxn extends Serve
}
}
}
-
+
private class WatchCommand extends CommandThread {
int len = 0;
public WatchCommand(PrintWriter pw, int len) {
@@ -821,7 +833,7 @@ public class NIOServerCnxn extends Serve
/** cancel the selection key to remove the socket handling
* from selector. This is to prevent netcat problem wherein
* netcat immediately closes the sending side after sending the
- * commands and still keeps the receiving channel open.
+ * commands and still keeps the receiving channel open.
* The idea is to remove the selectionkey from the selector
* so that the selector does not notice the closed read on the
* socket channel and keep the socket alive to write the data to
@@ -897,6 +909,9 @@ public class NIOServerCnxn extends Serve
IsroCommand isro = new IsroCommand(pwriter);
isro.start();
return true;
+ } else if (len == telnetCloseCmd) {
+ cleanupWriterSocket(null);
+ return true;
}
return false;
}
@@ -925,11 +940,7 @@ public class NIOServerCnxn extends Serve
}
public long getOutstandingRequests() {
- synchronized (this) {
- synchronized (this.factory) {
- return outstandingRequests;
- }
- }
+ return outstandingRequests.get();
}
/*
@@ -941,53 +952,45 @@ public class NIOServerCnxn extends Serve
return sessionTimeout;
}
+ /**
+ * Used by "dump" 4-letter command to list all connection in
+ * cnxnExpiryMap
+ */
@Override
public String toString() {
- return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
+ return "ip: " + sock.socket().getRemoteSocketAddress() +
+ " sessionId: 0x" + Long.toHexString(sessionId);
}
- /*
+ /**
* Close the cnxn and remove it from the factory cnxns list.
- *
- * This function returns immediately if the cnxn is not on the cnxns list.
*/
@Override
public void close() {
- synchronized(factory.cnxns){
- // if this is not in cnxns then it's already closed
- if (!factory.cnxns.remove(this)) {
- return;
- }
+ if (!factory.removeCnxn(this)) {
+ return;
+ }
- synchronized (factory.ipMap) {
- Set<NIOServerCnxn> s =
- factory.ipMap.get(sock.socket().getInetAddress());
- s.remove(this);
- }
-
- factory.unregisterConnection(this);
-
- if (zkServer != null) {
- zkServer.removeCnxn(this);
- }
-
- closeSock();
-
- if (sk != null) {
- try {
- // need to cancel this selection key from the selector
- sk.cancel();
- } catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("ignoring exception during selectionkey cancel", e);
- }
+ if (zkServer != null) {
+ zkServer.removeCnxn(this);
+ }
+
+ if (sk != null) {
+ try {
+ // need to cancel this selection key from the selector
+ sk.cancel();
+ } catch (Exception e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ignoring exception during selectionkey cancel", e);
}
}
}
+
+ closeSock();
}
/**
- * Close resources associated with the sock of this cnxn.
+ * Close resources associated with the sock of this cnxn.
*/
private void closeSock() {
if (sock == null) {
@@ -999,6 +1002,18 @@ public class NIOServerCnxn extends Serve
+ (sessionId != 0 ?
" which had sessionid 0x" + Long.toHexString(sessionId) :
" (no session established for client)"));
+ closeSock(sock);
+ sock = null;
+ }
+
+ /**
+ * Close resources associated with a sock.
+ */
+ public static void closeSock(SocketChannel sock) {
+ if (sock == null) {
+ return;
+ }
+
try {
/*
* The following sequence of code is stupid! You would think that
@@ -1030,18 +1045,13 @@ public class NIOServerCnxn extends Serve
}
try {
sock.close();
- // XXX The next line doesn't seem to be needed, but some posts
- // to forums suggest that it is needed. Keep in mind if errors in
- // this section arise.
- // factory.selector.wakeup();
} catch (IOException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("ignoring exception during socketchannel close", e);
}
}
- sock = null;
}
-
+
private final static byte fourBytes[] = new byte[4];
/*
@@ -1051,7 +1061,7 @@ public class NIOServerCnxn extends Serve
* org.apache.jute.Record, java.lang.String)
*/
@Override
- synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
+ public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
@@ -1071,16 +1081,10 @@ public class NIOServerCnxn extends Serve
bb.putInt(b.length - 4).rewind();
sendBuffer(bb);
if (h.getXid() > 0) {
- synchronized(this){
- outstandingRequests--;
- }
// check throttling
- synchronized (this.factory) {
- if (zkServer.getInProcess() < outstandingLimit
- || outstandingRequests < 1) {
- sk.selector().wakeup();
- enableRecv();
- }
+ if (outstandingRequests.decrementAndGet() < 1 ||
+ zkServer.getInProcess() < outstandingLimit) {
+ enableRecv();
}
}
} catch(Exception e) {
@@ -1094,7 +1098,7 @@ public class NIOServerCnxn extends Serve
* @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent)
*/
@Override
- synchronized public void process(WatchedEvent event) {
+ public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
@@ -1122,16 +1126,28 @@ public class NIOServerCnxn extends Serve
@Override
public void setSessionId(long sessionId) {
this.sessionId = sessionId;
+ factory.addSession(sessionId, this);
}
@Override
public void setSessionTimeout(int sessionTimeout) {
this.sessionTimeout = sessionTimeout;
+ factory.touchCnxn(this);
}
@Override
public int getInterestOps() {
- return sk.isValid() ? sk.interestOps() : 0;
+ if (!isSelectable()) {
+ return 0;
+ }
+ int interestOps = 0;
+ if (getReadInterest()) {
+ interestOps |= SelectionKey.OP_READ;
+ }
+ if (getWriteInterest()) {
+ interestOps |= SelectionKey.OP_WRITE;
+ }
+ return interestOps;
}
@Override
@@ -1142,6 +1158,13 @@ public class NIOServerCnxn extends Serve
return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
}
+ public InetAddress getSocketAddress() {
+ if (sock == null) {
+ return null;
+ }
+ return sock.socket().getInetAddress();
+ }
+
@Override
protected ServerStats serverStats() {
if (zkServer == null) {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=1423990&r1=1423989&r2=1423990&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Wed Dec 19 18:07:14 2012
@@ -19,25 +19,73 @@
package org.apache.zookeeper.server;
import java.io.IOException;
+import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.InetSocketAddress;
+import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
+/**
+ * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
+ * NIO non-blocking socket calls. Communication between threads is handled via
+ * queues.
+ *
+ * - 1 accept thread, which accepts new connections and assigns to a
+ * selector thread
+ * - 1-N selector threads, each of which selects on 1/N of the connections.
+ * The reason the factory supports more than one selector thread is that
+ * with large numbers of connections, select() itself can become a
+ * performance bottleneck.
+ * - 0-M socket I/O worker threads, which perform basic socket reads and
+ * writes. If configured with 0 worker threads, the selector threads
+ * do the socket I/O directly.
+ * - 1 connection expiration thread, which closes idle connections; this is
+ * necessary to expire connections on which no session is established.
+ *
+ * Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
+ * 1 connection expiration thread, 4 selector threads, and 64 worker threads.
+ */
+public class NIOServerCnxnFactory extends ServerCnxnFactory {
private static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxnFactory.class);
+ /** Default sessionless connection timeout in ms: 10000 (10s) */
+ public static final String ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT =
+ "zookeeper.nio.sessionlessCnxnTimeout";
+ /**
+ * With 500 connections to an observer with watchers firing on each, is
+ * unable to exceed 1GigE rates with only 1 selector.
+ * Defaults to using 2 selector threads with 8 cores and 4 with 32 cores.
+ * Expressed as sqrt(numCores/2). Must have at least 1 selector thread.
+ */
+ public static final String ZOOKEEPER_NIO_NUM_SELECTOR_THREADS =
+ "zookeeper.nio.numSelectorThreads";
+ /** Default: 2 * numCores */
+ public static final String ZOOKEEPER_NIO_NUM_WORKER_THREADS =
+ "zookeeper.nio.numWorkerThreads";
+ /** Default: 64kB */
+ public static final String ZOOKEEPER_NIO_DIRECT_BUFFER_BYTES =
+ "zookeeper.nio.directBufferBytes";
+ /** Default worker pool shutdown timeout in ms: 5000 (5s) */
+ public static final String ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT =
+ "zookeeper.nio.shutdownTimeout";
+
static {
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
@@ -54,47 +102,565 @@ public class NIOServerCnxnFactory extend
} catch(IOException ie) {
LOG.error("Selector failed to open", ie);
}
+
+ /**
+ * Value of 0 disables use of direct buffers and instead uses
+ * gathered write call.
+ *
+ * Default to using 64k direct buffers.
+ */
+ directBufferBytes = Integer.getInteger(
+ ZOOKEEPER_NIO_DIRECT_BUFFER_BYTES, 64 * 1024);
}
- ServerSocketChannel ss;
+ /**
+ * AbstractSelectThread is an abstract base class containing a few bits
+ * of code shared by the AcceptThread (which selects on the listen socket)
+ * and SelectorThread (which selects on client connections) classes.
+ */
+ private abstract class AbstractSelectThread extends Thread {
+ protected final Selector selector;
+
+ public AbstractSelectThread(String name) throws IOException {
+ super(name);
+ // Allows the JVM to shutdown even if this thread is still running.
+ setDaemon(true);
+ this.selector = Selector.open();
+ }
+
+ public void wakeupSelector() {
+ selector.wakeup();
+ }
+
+ protected void cleanupSelectionKey(SelectionKey key) {
+ if (key != null) {
+ try {
+ key.cancel();
+ } catch (Exception ex) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("ignoring exception during selectionkey cancel", ex);
+ }
+ }
+ }
+ }
- final Selector selector = Selector.open();
+ protected void fastCloseSock(SocketChannel sc) {
+ if (sc != null) {
+ try {
+ // Hard close immediately, discarding buffers
+ sc.socket().setSoLinger(true, 0);
+ } catch (SocketException e) {
+ LOG.warn("Unable to set socket linger to 0, socket close"
+ + " may stall in CLOSE_WAIT", e);
+ }
+ NIOServerCnxn.closeSock(sc);
+ }
+ }
+ }
/**
- * We use this buffer to do efficient socket I/O. Since there is a single
- * sender thread per NIOServerCnxn instance, we can use a member variable to
- * only allocate it once.
- */
- final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
+ * There is a single AcceptThread which accepts new connections and assigns
+ * them to a SelectorThread using a simple round-robin scheme to spread
+ * them across the SelectorThreads. It enforces maximum number of
+ * connections per IP and attempts to cope with running out of file
+ * descriptors by briefly sleeping before retrying.
+ */
+ private class AcceptThread extends AbstractSelectThread {
+ private final ServerSocketChannel acceptSocket;
+ private final SelectionKey acceptKey;
+ private final RateLogger acceptErrorLogger = new RateLogger(LOG);
+ private final Collection<SelectorThread> selectorThreads;
+ private Iterator<SelectorThread> selectorIterator;
+
+ public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
+ Set<SelectorThread> selectorThreads) throws IOException {
+ super("NIOServerCxnFactory.AcceptThread:" + addr);
+ this.acceptSocket = ss;
+ this.acceptKey =
+ acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
+ this.selectorThreads = Collections.unmodifiableList(
+ new ArrayList<SelectorThread>(selectorThreads));
+ selectorIterator = this.selectorThreads.iterator();
+ }
- final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
- new HashMap<InetAddress, Set<NIOServerCnxn>>( );
+ public void run() {
+ try {
+ while (!stopped && !acceptSocket.socket().isClosed()) {
+ try {
+ select();
+ } catch (RuntimeException e) {
+ LOG.warn("Ignoring unexpected runtime exception", e);
+ } catch (Exception e) {
+ LOG.warn("Ignoring unexpected exception", e);
+ }
+ }
+ } finally {
+ // This will wake up the selector threads, and tell the
+ // worker thread pool to begin shutdown.
+ NIOServerCnxnFactory.this.stop();
+ LOG.info("accept thread exitted run method");
+ }
+ }
+
+ private void select() {
+ try {
+ selector.select();
- int maxClientCnxns = 60;
+ Iterator<SelectionKey> selectedKeys =
+ selector.selectedKeys().iterator();
+ while (!stopped && selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ if (!key.isValid()) {
+ continue;
+ }
+ if (key.isAcceptable()) {
+ if (!doAccept()) {
+ // If unable to pull a new connection off the accept
+ // queue, pause accepting to give us time to free
+ // up file descriptors and so the accept thread
+ // doesn't spin in a tight loop.
+ pauseAccept(10);
+ }
+ } else {
+ LOG.warn("Unexpected ops in accept select "
+ + key.readyOps());
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Ignoring IOException while selecting", e);
+ }
+ }
+
+ /**
+ * Mask off the listen socket interest ops and use select() to sleep
+ * so that other threads can wake us up by calling wakeup() on the
+ * selector.
+ */
+ private void pauseAccept(long millisecs) {
+ acceptKey.interestOps(0);
+ try {
+ selector.select(millisecs);
+ } catch (IOException e) {
+ // ignore
+ } finally {
+ acceptKey.interestOps(SelectionKey.OP_ACCEPT);
+ }
+ }
+
+ /**
+ * Accept new socket connections. Enforces maximum number of connections
+ * per client IP address. Round-robin assigns to selector thread for
+ * handling. Returns whether pulled a connection off the accept queue
+ * or not. If encounters an error attempts to fast close the socket.
+ *
+ * @return whether was able to accept a connection or not
+ */
+ private boolean doAccept() {
+ boolean accepted = false;
+ SocketChannel sc = null;
+ try {
+ sc = acceptSocket.accept();
+ accepted = true;
+ InetAddress ia = sc.socket().getInetAddress();
+ int cnxncount = getClientCnxnCount(ia);
+
+ if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
+ throw new IOException("Too many connections from " + ia
+ + " - max is " + maxClientCnxns );
+ }
+
+ LOG.info("Accepted socket connection from "
+ + sc.socket().getRemoteSocketAddress());
+ sc.configureBlocking(false);
+
+ // Round-robin assign this connection to a selector thread
+ if (!selectorIterator.hasNext()) {
+ selectorIterator = selectorThreads.iterator();
+ }
+ SelectorThread selectorThread = selectorIterator.next();
+ if (!selectorThread.addAcceptedConnection(sc)) {
+ throw new IOException(
+ "Unable to add connection to selector queue"
+ + (stopped ? " (shutdown in progress)" : ""));
+ }
+ acceptErrorLogger.flush();
+ } catch (IOException e) {
+ // accept, maxClientCnxns, configureBlocking
+ acceptErrorLogger.rateLimitLog(
+ "Error accepting new connection: " + e.getMessage());
+ fastCloseSock(sc);
+ }
+ return accepted;
+ }
+ }
+
+ /**
+ * The SelectorThread receives newly accepted connections from the
+ * AcceptThread and is responsible for selecting for I/O readiness
+ * across the connections. This thread is the only thread that performs
+ * any non-threadsafe or potentially blocking calls on the selector
+ * (registering new connections and reading/writing interest ops).
+ *
+ * Assignment of a connection to a SelectorThread is permanent and only
+ * one SelectorThread will ever interact with the connection. There are
+ * 1-N SelectorThreads, with connections evenly apportioned between the
+ * SelectorThreads.
+ *
+ * If there is a worker thread pool, when a connection has I/O to perform
+ * the SelectorThread removes it from selection by clearing its interest
+ * ops and schedules the I/O for processing by a worker thread. When the
+ * work is complete, the connection is placed on the ready queue to have
+ * its interest ops restored and resume selection.
+ *
+ * If there is no worker thread pool, the SelectorThread performs the I/O
+ * directly.
+ */
+ class SelectorThread extends AbstractSelectThread {
+ private final int id;
+ private final Queue<SocketChannel> acceptedQueue;
+ private final Queue<SelectionKey> updateQueue;
+
+ public SelectorThread(int id) throws IOException {
+ super("NIOServerCxnFactory.SelectorThread-" + id);
+ this.id = id;
+ acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
+ updateQueue = new LinkedBlockingQueue<SelectionKey>();
+ }
+
+ /**
+ * Place new accepted connection onto a queue for adding. Do this
+ * so only the selector thread modifies what keys are registered
+ * with the selector.
+ */
+ public boolean addAcceptedConnection(SocketChannel accepted) {
+ if (stopped || !acceptedQueue.offer(accepted)) {
+ return false;
+ }
+ wakeupSelector();
+ return true;
+ }
+
+ /**
+ * Place interest op update requests onto a queue so that only the
+ * selector thread modifies interest ops, because interest ops
+ * reads/sets are potentially blocking operations if other select
+ * operations are happening.
+ */
+ public boolean addInterestOpsUpdateRequest(SelectionKey sk) {
+ if (stopped || !updateQueue.offer(sk)) {
+ return false;
+ }
+ wakeupSelector();
+ return true;
+ }
+
+ /**
+ * The main loop for the thread selects() on the connections and
+ * dispatches ready I/O work requests, then registers all pending
+ * newly accepted connections and updates any interest ops on the
+ * queue.
+ */
+ public void run() {
+ try {
+ while (!stopped) {
+ try {
+ select();
+ processAcceptedConnections();
+ processInterestOpsUpdateRequests();
+ } catch (RuntimeException e) {
+ LOG.warn("Ignoring unexpected runtime exception", e);
+ } catch (Exception e) {
+ LOG.warn("Ignoring unexpected exception", e);
+ }
+ }
+
+ // Close connections still pending on the selector. Any others
+ // with in-flight work, let drain out of the work queue.
+ for (SelectionKey key : selector.keys()) {
+ NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
+ if (cnxn.isSelectable()) {
+ cnxn.close();
+ }
+ cleanupSelectionKey(key);
+ }
+ SocketChannel accepted;
+ while ((accepted = acceptedQueue.poll()) != null) {
+ fastCloseSock(accepted);
+ }
+ updateQueue.clear();
+ } finally {
+ // This will wake up the accept thread and the other selector
+ // threads, and tell the worker thread pool to begin shutdown.
+ NIOServerCnxnFactory.this.stop();
+ LOG.info("selector thread exitted run method");
+ }
+ }
+
+ private void select() {
+ try {
+ selector.select();
+
+ Set<SelectionKey> selected = selector.selectedKeys();
+ ArrayList<SelectionKey> selectedList =
+ new ArrayList<SelectionKey>(selected);
+ Collections.shuffle(selectedList);
+ Iterator<SelectionKey> selectedKeys = selectedList.iterator();
+ while(!stopped && selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selected.remove(key);
+
+ if (!key.isValid()) {
+ cleanupSelectionKey(key);
+ continue;
+ }
+ if (key.isReadable() || key.isWritable()) {
+ handleIO(key);
+ } else {
+ LOG.warn("Unexpected ops in select " + key.readyOps());
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Ignoring IOException while selecting", e);
+ }
+ }
+
+ /**
+ * Schedule I/O for processing on the connection associated with
+ * the given SelectionKey. If a worker thread pool is not being used,
+ * I/O is run directly by this thread.
+ */
+ private void handleIO(SelectionKey key) {
+ IOWorkRequest workRequest = new IOWorkRequest(this, key);
+ NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
+
+ // Stop selecting this key while processing on its
+ // connection
+ cnxn.disableSelectable();
+ key.interestOps(0);
+ touchCnxn(cnxn);
+ workerPool.schedule(workRequest);
+ }
+
+ /**
+ * Iterate over the queue of accepted connections that have been
+ * assigned to this thread but not yet placed on the selector.
+ */
+ private void processAcceptedConnections() {
+ SocketChannel accepted;
+ while (!stopped && (accepted = acceptedQueue.poll()) != null) {
+ SelectionKey key = null;
+ try {
+ key = accepted.register(selector, SelectionKey.OP_READ);
+ NIOServerCnxn cnxn = createConnection(accepted, key, this);
+ key.attach(cnxn);
+ addCnxn(cnxn);
+ } catch (IOException e) {
+ // register, createConnection
+ cleanupSelectionKey(key);
+ fastCloseSock(accepted);
+ }
+ }
+ }
+
+ /**
+ * Iterate over the queue of connections ready to resume selection,
+ * and restore their interest ops selection mask.
+ */
+ private void processInterestOpsUpdateRequests() {
+ SelectionKey key;
+ while (!stopped && (key = updateQueue.poll()) != null) {
+ if (!key.isValid()) {
+ cleanupSelectionKey(key);
+ }
+ NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
+ if (cnxn.isSelectable()) {
+ key.interestOps(cnxn.getInterestOps());
+ }
+ }
+ }
+ }
+
+ /**
+ * IOWorkRequest is a small wrapper class to allow doIO() calls to be
+ * run on a connection using a WorkerService.
+ */
+ private class IOWorkRequest extends WorkerService.WorkRequest {
+ private final SelectorThread selectorThread;
+ private final SelectionKey key;
+ private final NIOServerCnxn cnxn;
+
+ IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
+ this.selectorThread = selectorThread;
+ this.key = key;
+ this.cnxn = (NIOServerCnxn) key.attachment();
+ }
+
+ public void doWork() throws InterruptedException {
+ if (!key.isValid()) {
+ selectorThread.cleanupSelectionKey(key);
+ return;
+ }
+
+ if (key.isReadable() || key.isWritable()) {
+ cnxn.doIO(key);
+
+ // Check if we shutdown or doIO() closed this connection
+ if (stopped) {
+ cnxn.close();
+ return;
+ }
+ if (!key.isValid()) {
+ selectorThread.cleanupSelectionKey(key);
+ return;
+ }
+ touchCnxn(cnxn);
+ }
+
+ // Mark this connection as once again ready for selection
+ cnxn.enableSelectable();
+ // Push an update request on the queue to resume selecting
+ // on the current set of interest ops, which may have changed
+ // as a result of the I/O operations we just performed.
+ if (!selectorThread.addInterestOpsUpdateRequest(key)) {
+ cnxn.close();
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ cnxn.close();
+ }
+ }
+
+ /**
+ * This thread is responsible for closing stale connections so that
+ * connections on which no session is established are properly expired.
+ */
+ private class ConnectionExpirerThread extends Thread {
+ ConnectionExpirerThread() {
+ super("ConnnectionExpirer");
+ }
+
+ public void run() {
+ try {
+ while (!stopped) {
+ long waitTime = cnxnExpiryQueue.getWaitTime();
+ if (waitTime > 0) {
+ Thread.sleep(waitTime);
+ continue;
+ }
+ for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
+ conn.close();
+ }
+ }
+
+ } catch (InterruptedException e) {
+ LOG.info("ConnnectionExpirerThread interrupted");
+ }
+ }
+ }
+
+ ServerSocketChannel ss;
+
+ /**
+ * We use this buffer to do efficient socket I/O. Because I/O is handled
+ * by the worker threads (or the selector threads directly, if no worker
+ * thread pool is created), we can create a fixed set of these to be
+ * shared by connections.
+ */
+ private static final ThreadLocal<ByteBuffer> directBuffer =
+ new ThreadLocal<ByteBuffer>() {
+ @Override protected ByteBuffer initialValue() {
+ return ByteBuffer.allocateDirect(directBufferBytes);
+ }
+ };
+
+ public static ByteBuffer getDirectBuffer() {
+ return directBufferBytes > 0 ? directBuffer.get() : null;
+ }
+
+ // sessionMap is used by closeSession()
+ private final ConcurrentHashMap<Long, NIOServerCnxn> sessionMap =
+ new ConcurrentHashMap<Long, NIOServerCnxn>();
+ // ipMap is used to limit connections per IP
+ private final ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
+ new ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>>( );
+
+ protected int maxClientCnxns = 60;
+
+ int sessionlessCnxnTimeout;
+ private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;
+
+
+ protected WorkerService workerPool;
+
+ private static int directBufferBytes;
+ private int numSelectorThreads;
+ private int numWorkerThreads;
+ private long workerShutdownTimeoutMS;
/**
* Construct a new server connection factory which will accept an unlimited number
* of concurrent connections from each client (up to the file descriptor
* limits of the operating system). startup(zks) must be called subsequently.
- * @throws IOException
*/
- public NIOServerCnxnFactory() throws IOException {
+ public NIOServerCnxnFactory() {
}
- Thread thread;
+ private volatile boolean stopped = true;
+ private ConnectionExpirerThread expirerThread;
+ private AcceptThread acceptThread;
+ private final Set<SelectorThread> selectorThreads =
+ new HashSet<SelectorThread>();
+
@Override
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
configureSaslLogin();
- thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
- thread.setDaemon(true);
maxClientCnxns = maxcc;
+ sessionlessCnxnTimeout = Integer.getInteger(
+ ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
+ // We also use the sessionlessCnxnTimeout as expiring interval for
+ // cnxnExpiryQueue. These don't need to be the same, but the expiring
+ // interval passed into the ExpiryQueue() constructor below should be
+ // less than or equal to the timeout.
+ cnxnExpiryQueue =
+ new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
+ expirerThread = new ConnectionExpirerThread();
+
+ int numCores = Runtime.getRuntime().availableProcessors();
+ // 32 cores sweet spot seems to be 4 selector threads
+ numSelectorThreads = Integer.getInteger(
+ ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
+ Math.max((int) Math.sqrt((float) numCores/2), 1));
+ if (numSelectorThreads < 1) {
+ throw new IOException("numSelectorThreads must be at least 1");
+ }
+
+ numWorkerThreads = Integer.getInteger(
+ ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
+ workerShutdownTimeoutMS = Long.getLong(
+ ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
+
+ LOG.info("Configuring NIO connection handler with "
+ + (sessionlessCnxnTimeout/1000) + "s sessionless connection"
+ + " timeout, " + numSelectorThreads + " selector thread(s), "
+ + (numWorkerThreads > 0 ? numWorkerThreads : "no")
+ + " worker threads, and "
+ + (directBufferBytes == 0 ? "gathered writes." :
+ ("" + (directBufferBytes/1024) + " kB direct buffers.")));
+ for(int i=0; i<numSelectorThreads; ++i) {
+ selectorThreads.add(new SelectorThread(i));
+ }
+
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
LOG.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
- ss.register(selector, SelectionKey.OP_ACCEPT);
+ acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
/** {@inheritDoc} */
@@ -109,9 +675,22 @@ public class NIOServerCnxnFactory extend
@Override
public void start() {
+ stopped = false;
+ if (workerPool == null) {
+ workerPool = new WorkerService(
+ "NIOWorker", numWorkerThreads, false);
+ }
+ for(SelectorThread thread : selectorThreads) {
+ if (thread.getState() == Thread.State.NEW) {
+ thread.start();
+ }
+ }
// ensure thread is started once and only once
- if (thread.getState() == Thread.State.NEW) {
- thread.start();
+ if (acceptThread.getState() == Thread.State.NEW) {
+ acceptThread.start();
+ }
+ if (expirerThread.getState() == Thread.State.NEW) {
+ expirerThread.start();
}
}
@@ -134,94 +713,79 @@ public class NIOServerCnxnFactory extend
return ss.socket().getLocalPort();
}
- private void addCnxn(NIOServerCnxn cnxn) {
- synchronized (cnxns) {
- cnxns.add(cnxn);
- synchronized (ipMap){
- InetAddress addr = cnxn.sock.socket().getInetAddress();
- Set<NIOServerCnxn> s = ipMap.get(addr);
- if (s == null) {
- // in general we will see 1 connection from each
- // host, setting the initial cap to 2 allows us
- // to minimize mem usage in the common case
- // of 1 entry -- we need to set the initial cap
- // to 2 to avoid rehash when the first entry is added
- s = new HashSet<NIOServerCnxn>(2);
- s.add(cnxn);
- ipMap.put(addr,s);
- } else {
- s.add(cnxn);
- }
+ /**
+ * De-registers the connection from the various mappings maintained
+ * by the factory.
+ */
+ public boolean removeCnxn(NIOServerCnxn cnxn) {
+ // If the connection is not in the master list it's already been closed
+ if (!cnxns.remove(cnxn)) {
+ return false;
+ }
+ cnxnExpiryQueue.remove(cnxn);
+
+ long sessionId = cnxn.getSessionId();
+ if (sessionId != 0) {
+ sessionMap.remove(sessionId);
+ }
+
+ InetAddress addr = cnxn.getSocketAddress();
+ if (addr != null) {
+ Set<NIOServerCnxn> set = ipMap.get(addr);
+ if (set != null) {
+ set.remove(cnxn);
+ // Note that we make no effort here to remove empty mappings
+ // from ipMap.
}
}
- }
- protected NIOServerCnxn createConnection(SocketChannel sock,
- SelectionKey sk) throws IOException {
- return new NIOServerCnxn(zkServer, sock, sk, this);
+ // unregister from JMX
+ unregisterConnection(cnxn);
+ return true;
}
- private int getClientCnxnCount(InetAddress cl) {
- // The ipMap lock covers both the map, and its contents
- // (that is, the cnxn sets shouldn't be modified outside of
- // this lock)
- synchronized (ipMap) {
- Set<NIOServerCnxn> s = ipMap.get(cl);
- if (s == null) return 0;
- return s.size();
- }
+ /**
+ * Add or update cnxn in our cnxnExpiryQueue
+ * @param cnxn
+ */
+ public void touchCnxn(NIOServerCnxn cnxn) {
+ cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
}
- public void run() {
- while (!ss.socket().isClosed()) {
- try {
- selector.select(1000);
- Set<SelectionKey> selected;
- synchronized (this) {
- selected = selector.selectedKeys();
- }
- ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
- selected);
- Collections.shuffle(selectedList);
- for (SelectionKey k : selectedList) {
- if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
- SocketChannel sc = ((ServerSocketChannel) k
- .channel()).accept();
- InetAddress ia = sc.socket().getInetAddress();
- int cnxncount = getClientCnxnCount(ia);
- if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
- LOG.warn("Too many connections from " + ia
- + " - max is " + maxClientCnxns );
- sc.close();
- } else {
- LOG.info("Accepted socket connection from "
- + sc.socket().getRemoteSocketAddress());
- sc.configureBlocking(false);
- SelectionKey sk = sc.register(selector,
- SelectionKey.OP_READ);
- NIOServerCnxn cnxn = createConnection(sc, sk);
- sk.attach(cnxn);
- addCnxn(cnxn);
- }
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
- NIOServerCnxn c = (NIOServerCnxn) k.attachment();
- c.doIO(k);
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Unexpected ops in select "
- + k.readyOps());
- }
- }
- }
- selected.clear();
- } catch (RuntimeException e) {
- LOG.warn("Ignoring unexpected runtime exception", e);
- } catch (Exception e) {
- LOG.warn("Ignoring exception", e);
+ private void addCnxn(NIOServerCnxn cnxn) {
+ InetAddress addr = cnxn.getSocketAddress();
+ Set<NIOServerCnxn> set = ipMap.get(addr);
+ if (set == null) {
+ // in general we will see 1 connection from each
+ // host, setting the initial cap to 2 allows us
+ // to minimize mem usage in the common case
+ // of 1 entry -- we need to set the initial cap
+ // to 2 to avoid rehash when the first entry is added
+ // Construct a ConcurrentHashSet using a ConcurrentHashMap
+ set = Collections.newSetFromMap(
+ new ConcurrentHashMap<NIOServerCnxn, Boolean>(2));
+ // Put the new set in the map, but only if another thread
+ // hasn't beaten us to it
+ Set<NIOServerCnxn> existingSet = ipMap.putIfAbsent(addr, set);
+ if (existingSet != null) {
+ set = existingSet;
}
}
- closeAll();
- LOG.info("NIOServerCnxn factory exited run method");
+ set.add(cnxn);
+
+ cnxns.add(cnxn);
+ touchCnxn(cnxn);
+ }
+
+ protected NIOServerCnxn createConnection(SocketChannel sock,
+ SelectionKey sk, SelectorThread selectorThread) throws IOException {
+ return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);
+ }
+
+ private int getClientCnxnCount(InetAddress cl) {
+ Set<NIOServerCnxn> s = ipMap.get(cl);
+ if (s == null) return 0;
+ return s.size();
}
/**
@@ -230,30 +794,54 @@ public class NIOServerCnxnFactory extend
*/
@Override
@SuppressWarnings("unchecked")
- synchronized public void closeAll() {
- selector.wakeup();
- HashSet<NIOServerCnxn> cnxns;
- synchronized (this.cnxns) {
- cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
- }
- // got to clear all the connections that we have in the selector
- for (NIOServerCnxn cnxn: cnxns) {
+ public void closeAll() {
+ // clear all the connections on which we are selecting
+ for (ServerCnxn cnxn : cnxns) {
try {
- // don't hold this.cnxns lock as deadlock may occur
+ // This will remove the cnxn from cnxns
cnxn.close();
} catch (Exception e) {
LOG.warn("Ignoring exception closing cnxn sessionid 0x"
- + Long.toHexString(cnxn.sessionId), e);
+ + Long.toHexString(cnxn.getSessionId()), e);
}
}
}
- public void shutdown() {
+ public void stop() {
+ stopped = true;
+
+ // Stop queuing connection attempts
try {
ss.close();
+ } catch (IOException e) {
+ LOG.warn("Error closing listen socket", e);
+ }
+
+ if (acceptThread != null) {
+ acceptThread.wakeupSelector();
+ }
+ if (expirerThread != null) {
+ expirerThread.interrupt();
+ }
+ for (SelectorThread thread : selectorThreads) {
+ thread.wakeupSelector();
+ }
+ if (workerPool != null) {
+ workerPool.stop();
+ }
+ }
+
+ public void shutdown() {
+ try {
+ // close listen socket and signal selector threads to stop
+ stop();
+
+ // wait for selector and worker threads to shutdown
+ join();
+
+ // close all open connections
closeAll();
- thread.interrupt();
- thread.join();
+
if (login != null) {
login.shutdown();
}
@@ -262,48 +850,45 @@ public class NIOServerCnxnFactory extend
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception during shutdown", e);
}
- try {
- selector.close();
- } catch (IOException e) {
- LOG.warn("Selector closing", e);
- }
+
if (zkServer != null) {
zkServer.shutdown();
}
}
- @Override
- public synchronized void closeSession(long sessionId) {
- selector.wakeup();
- closeSessionWithoutWakeup(sessionId);
+ public void addSession(long sessionId, NIOServerCnxn cnxn) {
+ sessionMap.put(sessionId, cnxn);
}
- @SuppressWarnings("unchecked")
- private void closeSessionWithoutWakeup(long sessionId) {
- HashSet<NIOServerCnxn> cnxns;
- synchronized (this.cnxns) {
- cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
- }
-
- for (NIOServerCnxn cnxn : cnxns) {
- if (cnxn.getSessionId() == sessionId) {
- try {
- cnxn.close();
- } catch (Exception e) {
- LOG.warn("exception during session close", e);
- }
- break;
- }
+ @Override
+ public void closeSession(long sessionId) {
+ NIOServerCnxn cnxn = sessionMap.remove(sessionId);
+ if (cnxn != null) {
+ cnxn.close();
}
}
@Override
public void join() throws InterruptedException {
- thread.join();
+ if (acceptThread != null) {
+ acceptThread.join();
+ }
+ for (SelectorThread thread : selectorThreads) {
+ thread.join();
+ }
+ if (workerPool != null) {
+ workerPool.join(workerShutdownTimeoutMS);
+ }
}
@Override
public Iterable<ServerCnxn> getConnections() {
return cnxns;
}
+
+ public void dumpConnections(PrintWriter pwriter) {
+ pwriter.print("Connections ");
+ cnxnExpiryQueue.dump(pwriter);
+ }
+
}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java?rev=1423990&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/RateLogger.java Wed Dec 19 18:07:14 2012
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.slf4j.Logger;
+
+public class RateLogger {
+ public RateLogger(Logger log) {
+ LOG = log;
+ }
+
+ private final Logger LOG;
+ private String msg = null;
+ private long timestamp;
+ private int count = 0;
+
+ public void flush() {
+ if (msg != null) {
+ if (count > 1) {
+ LOG.warn("[" + count + " times] " + msg);
+ } else if (count == 1) {
+ LOG.warn(msg);
+ }
+ }
+ msg = null;
+ count = 0;
+ }
+
+ public void rateLimitLog(String newMsg) {
+ long now = System.currentTimeMillis();
+ if (newMsg.equals(msg)) {
+ ++count;
+ if (now - timestamp >= 100) {
+ flush();
+ msg = newMsg;
+ timestamp = now;
+ }
+ } else {
+ flush();
+ msg = newMsg;
+ timestamp = now;
+ LOG.warn(msg);
+ }
+ }
+}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java?rev=1423990&r1=1423989&r2=1423990&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java Wed Dec 19 18:07:14 2012
@@ -227,6 +227,14 @@ public abstract class ServerCnxn impleme
protected final static int isroCmd = ByteBuffer.wrap("isro".getBytes())
.getInt();
+ /*
+ * The control sequence sent by the telnet program when it closes a
+ * connection. Include simply to keep the logs cleaner (the server would
+ * close the connection anyway because it would parse this as a negative
+ * length).
+ */
+ protected final static int telnetCloseCmd = 0xfff4fffd;
+
protected final static HashMap<Integer, String> cmd2String =
new HashMap<Integer, String>();
@@ -248,6 +256,7 @@ public abstract class ServerCnxn impleme
cmd2String.put(wchsCmd, "wchs");
cmd2String.put(mntrCmd, "mntr");
cmd2String.put(isroCmd, "isro");
+ cmd2String.put(telnetCloseCmd, "telnet close");
}
protected void packetReceived() {