You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by GitBox <gi...@apache.org> on 2019/01/14 18:39:18 UTC
[zookeeper] Diff for: [GitHub] asfgit closed pull request #684: ZOOKEEPER-3180: Add response cache to improve the throughput of read …
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 2b68e38268..d808b612a9 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -685,6 +685,17 @@ property, when available, is noted below.
defaults to 1000. This value can only be set as a
system property.
+* *maxResponseCacheSize* :
+ (Java system property: **zookeeper.maxResponseCacheSize**)
+ When set to a positive integer, it determines the size
+ of the cache that stores the serialized form of recently
+ read records. Helps save the serialization cost on
+ popular znodes. The metrics **response_packet_cache_hits**
+ and **response_packet_cache_misses** can be used to tune
+ this value to a given workload. The feature is turned on
+ by default with a value of 400, set to 0 or a negative
+ integer to turn the feature off.
+
* *autopurge.snapRetainCount* :
(No Java system property)
**New in 3.4.0:**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
index f384d7c5c7..1f64dd09dc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/DumbWatcher.java
@@ -26,8 +26,7 @@
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.proto.ReplyHeader;
-import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.ServerStats;
+import org.apache.zookeeper.data.Stat;
/**
* A empty watcher implementation used in bench and unit test.
@@ -58,7 +57,7 @@ public void process(WatchedEvent event) { }
void close() { }
@Override
- public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { }
+ public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat) throws IOException { }
@Override
public void sendCloseSession() { }
@@ -70,7 +69,7 @@ public void sendCloseSession() { }
void setSessionId(long sessionId) { }
@Override
- void sendBuffer(ByteBuffer closeConn) { }
+ void sendBuffer(ByteBuffer... closeConn) { }
@Override
void enableRecv() { }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index b9427e8f40..d022193e73 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -168,6 +168,7 @@ public void processRequest(Request request) {
zks.decInProcess();
Code err = Code.OK;
Record rsp = null;
+ String path = null;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
/*
@@ -316,7 +317,7 @@ public void processRequest(Request request) {
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
- String path = existsRequest.getPath();
+ path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
@@ -330,15 +331,16 @@ public void processRequest(Request request) {
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
- DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
+ path = getDataRequest.getPath();
+ DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
- request.authInfo, getDataRequest.getPath(), null);
+ request.authInfo, path, null);
Stat stat = new Stat();
- byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
+ byte b[] = zks.getZKDatabase().getData(path, stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
@@ -362,8 +364,9 @@ public void processRequest(Request request) {
ByteBufferInputStream.byteBuffer2Record(request.request,
getACLRequest);
Stat stat = new Stat();
+ path = getACLRequest.getPath();
List<ACL> acl =
- zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
+ zks.getZKDatabase().getACL(path, stat);
rsp = new GetACLResponse(acl, stat);
break;
}
@@ -372,15 +375,16 @@ public void processRequest(Request request) {
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildrenRequest);
- DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
+ path = getChildrenRequest.getPath();
+ DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
- request.authInfo, getChildrenRequest.getPath(), null);
+ request.authInfo, path, null);
List<String> children = zks.getZKDatabase().getChildren(
- getChildrenRequest.getPath(), null, getChildrenRequest
+ path, null, getChildrenRequest
.getWatch() ? cnxn : null);
rsp = new GetChildrenResponse(children);
break;
@@ -391,15 +395,16 @@ public void processRequest(Request request) {
ByteBufferInputStream.byteBuffer2Record(request.request,
getChildren2Request);
Stat stat = new Stat();
- DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
+ path = getChildren2Request.getPath();
+ DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
- request.authInfo, getChildren2Request.getPath(), null);
+ request.authInfo, path, null);
List<String> children = zks.getZKDatabase().getChildren(
- getChildren2Request.getPath(), stat, getChildren2Request
+ path, stat, getChildren2Request
.getWatch() ? cnxn : null);
rsp = new GetChildren2Response(children, stat);
break;
@@ -410,11 +415,12 @@ public void processRequest(Request request) {
ByteBufferInputStream.byteBuffer2Record(request.request,
checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
+ path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(
- checkWatches.getPath(), type, cnxn);
+ path, type, cnxn);
if (!containsWatcher) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
- checkWatches.getPath(), type);
+ path, type);
throw new KeeperException.NoWatcherException(msg);
}
break;
@@ -425,11 +431,12 @@ public void processRequest(Request request) {
ByteBufferInputStream.byteBuffer2Record(request.request,
removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
+ path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(
- removeWatches.getPath(), type, cnxn);
+ path, type, cnxn);
if (!removed) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
- removeWatches.getPath(), type);
+ path, type);
throw new KeeperException.NoWatcherException(msg);
}
break;
@@ -468,7 +475,19 @@ public void processRequest(Request request) {
updateStats(request, lastOp, lastZxid);
try {
- cnxn.sendResponse(hdr, rsp, "response");
+ if (request.type == OpCode.getData && path != null && rsp != null) {
+ // Serialized read responses could be cached by the connection object.
+ // Cache entries are identified by their path and last modified zxid,
+ // so these values are passed along with the response.
+ GetDataResponse getDataResponse = (GetDataResponse)rsp;
+ Stat stat = null;
+ if (getDataResponse != null && getDataResponse.getStat() != null) {
+ stat = getDataResponse.getStat();
+ }
+ cnxn.sendResponse(hdr, rsp, "response", path, stat);
+ } else {
+ cnxn.sendResponse(hdr, rsp, "response");
+ }
if (request.type == OpCode.closeSession) {
cnxn.sendCloseSession();
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index b48eb3dc3b..c2ab78487a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -19,7 +19,6 @@
package org.apache.zookeeper.server;
import java.io.BufferedWriter;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.Writer;
@@ -36,10 +35,10 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.jute.BinaryInputArchive;
-import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.WatcherEvent;
@@ -137,12 +136,17 @@ void sendBufferSync(ByteBuffer bb) {
* sendBuffer pushes a byte buffer onto the outgoing buffer queue for
* asynchronous writes.
*/
- public void sendBuffer(ByteBuffer bb) {
+ public void sendBuffer(ByteBuffer... buffers) {
if (LOG.isTraceEnabled()) {
LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
+ " is valid: " + sk.isValid());
}
- outgoingBuffers.add(bb);
+ synchronized (outgoingBuffers) {
+ for (ByteBuffer buffer : buffers) {
+ outgoingBuffers.add(buffer);
+ }
+ outgoingBuffers.add(packetSentinel);
+ }
requestInterestOpsUpdate();
}
@@ -221,10 +225,12 @@ void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
}
+ if (bb == packetSentinel) {
+ packetSent();
+ }
if (bb.remaining() > 0) {
break;
}
- packetSent();
outgoingBuffers.remove();
}
} else {
@@ -269,6 +275,9 @@ void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
if (bb == ServerCnxnFactory.closeConn) {
throw new CloseRequestException("close requested");
}
+ if (bb == packetSentinel) {
+ packetSent();
+ }
if (sent < bb.remaining()) {
/*
* We only partially sent this buffer, so we update
@@ -277,7 +286,6 @@ void handleWrite(SelectionKey k) throws IOException, CloseRequestException {
bb.position(bb.position() + sent);
break;
}
- packetSent();
/* We've sent the whole buffer, so drop the buffer */
sent -= bb.remaining();
outgoingBuffers.remove();
@@ -648,16 +656,34 @@ public static void closeSock(SocketChannel sock) {
}
}
- /*
- * (non-Javadoc)
+ private final static ByteBuffer packetSentinel = ByteBuffer.allocate(0);
+
+ /**
+ * Serializes a ZooKeeper response and enqueues it for sending.
+ *
+ * Serializes client response parts and enqueues them into outgoing queue.
+ *
+ * If both cache key and last modified zxid are provided, the serialized
+ * response is caсhed under the provided key, the last modified zxid is
+ * stored along with the value. A cache entry is invalidated if the
+ * provided last modified zxid is more recent than the stored one.
+ *
+ * Attention: this function is not thread safe, due to caching not being
+ * thread safe.
*
- * @see org.apache.zookeeper.server.ServerCnxnIface#sendResponse(org.apache.zookeeper.proto.ReplyHeader,
- * org.apache.jute.Record, java.lang.String)
+ * @param h reply header
+ * @param r reply payload, can be null
+ * @param tag Jute serialization tag, can be null
+ * @param cacheKey key for caching the serialized payload. a null value
+ * prvents caching
+ * @param stat stat information for the the reply payload, used
+ * for cache invalidation. a value of 0 prevents caching.
*/
@Override
- public void sendResponse(ReplyHeader h, Record r, String tag) {
+ public void sendResponse(ReplyHeader h, Record r, String tag,
+ String cacheKey, Stat stat) {
try {
- super.sendResponse(h, r, tag);
+ sendBuffer(serialize(h, r, tag, cacheKey, stat));
decrOutstandingAndCheckThrottle(h);
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
@@ -682,7 +708,7 @@ public void process(WatchedEvent event) {
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
- sendResponse(h, e, "notification");
+ sendResponse(h, e, "notification", null, null);
}
/*
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index 311d3c1d20..b6bb343f49 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -39,6 +39,7 @@
import io.netty.util.ReferenceCountUtil;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
+import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.WatcherEvent;
@@ -160,12 +161,14 @@ public void process(WatchedEvent event) {
}
@Override
- public void sendResponse(ReplyHeader h, Record r, String tag)
- throws IOException {
+ public void sendResponse(ReplyHeader h, Record r, String tag,
+ String cacheKey, Stat stat) throws IOException {
+ // cacheKey and stat are used in caching, which is not
+ // implemented here. Implementation example can be found in NIOServerCnxn.
if (closingChannel || !channel.isOpen()) {
return;
}
- super.sendResponse(h, r, tag);
+ sendBuffer(serialize(h, r, tag, cacheKey, stat));
decrOutstandingAndCheckThrottle(h);
}
@@ -176,12 +179,12 @@ public void setSessionId(long sessionId) {
}
@Override
- public void sendBuffer(ByteBuffer sendBuffer) {
- if (sendBuffer == ServerCnxnFactory.closeConn) {
+ public void sendBuffer(ByteBuffer... buffers) {
+ if (buffers.length == 1 && buffers[0] == ServerCnxnFactory.closeConn) {
close();
return;
}
- channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(f -> {
+ channel.writeAndFlush(Unpooled.wrappedBuffer(buffers)).addListener(f -> {
if (f.isSuccess()) {
packetSent();
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ResponseCache.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ResponseCache.java
new file mode 100644
index 0000000000..73db7d5802
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ResponseCache.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.zookeeper.data.Stat;
+
+@SuppressWarnings("serial")
+public class ResponseCache {
+ // Magic number chosen to be "big enough but not too big"
+ private static final int DEFAULT_RESPONSE_CACHE_SIZE = 400;
+
+ private static class Entry {
+ public Stat stat;
+ public byte[] data;
+ }
+
+ private Map<String, Entry> cache = Collections.synchronizedMap(
+ new LRUCache<String, Entry>(getResponseCacheSize()));
+
+ public ResponseCache() {
+ }
+
+ public void put(String path, byte[] data, Stat stat) {
+ Entry entry = new Entry();
+ entry.data = data;
+ entry.stat = stat;
+ cache.put(path, entry);
+ }
+
+ public byte[] get(String key, Stat stat) {
+ Entry entry = cache.get(key);
+ if (entry == null) {
+ return null;
+ }
+ if (!stat.equals(entry.stat)) {
+ // The node has been modified, invalidate cache.
+ cache.remove(key);
+ return null;
+ } else {
+ return entry.data;
+ }
+ }
+
+ private static int getResponseCacheSize() {
+ return Integer.getInteger("zookeeper.maxResponseCacheSize", DEFAULT_RESPONSE_CACHE_SIZE);
+ }
+
+ public static boolean isEnabled() {
+ return getResponseCacheSize() > 0;
+ }
+
+ private static class LRUCache<K, V> extends LinkedHashMap<K, V> {
+ private int cacheSize;
+
+ LRUCache(int cacheSize) {
+ super(cacheSize/4);
+ this.cacheSize = cacheSize;
+ }
+
+ protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
+ return size() >= cacheSize;
+ }
+ }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
index 8e145cbeb1..b0088d1fba 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java
@@ -37,9 +37,11 @@
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
+import org.apache.zookeeper.Quotas;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
import org.slf4j.Logger;
@@ -103,25 +105,64 @@ public void decrOutstandingAndCheckThrottle(ReplyHeader h) {
abstract void close();
+ public abstract void sendResponse(ReplyHeader h, Record r,
+ String tag, String cacheKey, Stat stat) throws IOException;
+
public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // Make space for length
+ sendResponse(h, r, tag, null, null);
+ }
+
+ protected byte[] serializeRecord(Record record) throws IOException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(
+ ZooKeeperServer.intBufferStartingSizeBytes);
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
- try {
- baos.write(fourBytes);
- bos.writeRecord(h, "header");
- if (r != null) {
- bos.writeRecord(r, tag);
+ bos.writeRecord(record, null);
+ return baos.toByteArray();
+ }
+
+ protected ByteBuffer[] serialize(ReplyHeader h, Record r, String tag,
+ String cacheKey, Stat stat) throws IOException {
+ byte[] header = serializeRecord(h);
+ byte[] data = null;
+ if (r != null) {
+ ResponseCache cache = zkServer.getReadResponseCache();
+ if (cache != null && stat != null && cacheKey != null &&
+ !cacheKey.endsWith(Quotas.statNode)) {
+ // Use cache to get serialized data.
+ //
+ // NB: Tag is ignored both during cache lookup and serialization,
+ // since is is not used in read responses, which are being cached.
+ data = cache.get(cacheKey, stat);
+ if (data == null) {
+ // Cache miss, serialize the response and put it in cache.
+ data = serializeRecord(r);
+ cache.put(cacheKey, data, stat);
+ ServerMetrics.RESPONSE_PACKET_CACHE_MISSING.add(1);
+ } else {
+ ServerMetrics.RESPONSE_PACKET_CACHE_HITS.add(1);
+ }
+ } else {
+ data = serializeRecord(r);
}
- baos.close();
- } catch (IOException e) {
- LOG.error("Error serializing response");
}
- byte b[] = baos.toByteArray();
- serverStats().updateClientResponseSize(b.length - 4);
- ByteBuffer bb = ByteBuffer.wrap(b);
- bb.putInt(b.length - 4).rewind();
- sendBuffer(bb);
+ int dataLength = data == null ? 0 : data.length;
+ int packetLength = header.length + dataLength;
+ ServerStats serverStats = serverStats();
+ if (serverStats != null) {
+ serverStats.updateClientResponseSize(packetLength);
+ }
+ ByteBuffer lengthBuffer = ByteBuffer.allocate(4).putInt(packetLength);
+ lengthBuffer.rewind();
+
+ int bufferLen = data != null ? 3 : 2;
+ ByteBuffer[] buffers = new ByteBuffer[bufferLen];
+
+ buffers[0] = lengthBuffer;
+ buffers[1] = ByteBuffer.wrap(header);
+ if (data != null) {
+ buffers[2] = ByteBuffer.wrap(data);
+ }
+ return buffers;
}
/* notify the client the session is closing and close/cleanup socket */
@@ -146,7 +187,7 @@ public boolean removeAuthInfo(Id id) {
return authInfo.remove(id);
}
- abstract void sendBuffer(ByteBuffer closeConn);
+ abstract void sendBuffer(ByteBuffer... buffers);
abstract void enableRecv();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index c5d82deebb..3420b88e8b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -67,7 +67,10 @@
SNAP_COUNT(new SimpleCounter("snap_count")),
COMMIT_COUNT(new SimpleCounter("commit_count")),
CONNECTION_REQUEST_COUNT(new SimpleCounter("connection_request_count")),
- BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count"));
+ BYTES_RECEIVED_COUNT(new SimpleCounter("bytes_received_count")),
+
+ RESPONSE_PACKET_CACHE_HITS(new SimpleCounter("response_packet_cache_hits")),
+ RESPONSE_PACKET_CACHE_MISSING(new SimpleCounter("response_packet_cache_misses"));
private final Metric metric;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 20ab023ec5..833c79bab0 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -106,10 +106,12 @@
protected SessionTracker sessionTracker;
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;
+ private ResponseCache readResponseCache;
private final AtomicLong hzxid = new AtomicLong(0);
public final static Exception ok = new Exception("No prob");
protected RequestProcessor firstProcessor;
protected volatile State state = State.INITIAL;
+ private boolean isResponseCachingEnabled = true;
protected enum State {
INITIAL, RUNNING, SHUTDOWN, ERROR
@@ -138,6 +140,30 @@
private ZooKeeperServerShutdownHandler zkShutdownHandler;
private volatile int createSessionTrackerServerId = 1;
+ /**
+ * Starting size of read and write ByteArroyOuputBuffers. Default is 32 bytes.
+ * Flag not used for small transfers like connectResponses.
+ */
+ public static final String INT_BUFFER_STARTING_SIZE_BYTES =
+ "zookeeper.intBufferStartingSizeBytes";
+ public static final int DEFAULT_STARTING_BUFFER_SIZE = 1024;
+ public static final int intBufferStartingSizeBytes;
+
+ static {
+ intBufferStartingSizeBytes = Integer.getInteger(
+ INT_BUFFER_STARTING_SIZE_BYTES,
+ DEFAULT_STARTING_BUFFER_SIZE);
+
+ if (intBufferStartingSizeBytes < 32) {
+ String msg = "Buffer starting size must be greater than 0." +
+ "Configure with \"-Dzookeeper.intBufferStartingSizeBytes=<size>\" ";
+ LOG.error(msg);
+ throw new IllegalArgumentException(msg);
+ }
+
+ LOG.info(INT_BUFFER_STARTING_SIZE_BYTES + " = " + intBufferStartingSizeBytes);
+ }
+
void removeCnxn(ServerCnxn cnxn) {
zkDb.removeCnxn(cnxn);
}
@@ -170,6 +196,7 @@ public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
setMinSessionTimeout(minSessionTimeout);
setMaxSessionTimeout(maxSessionTimeout);
listener = new ZooKeeperServerListenerImpl(this);
+ readResponseCache = new ResponseCache();
LOG.info("Created server with tickTime " + tickTime
+ " minSessionTimeout " + getMinSessionTimeout()
+ " maxSessionTimeout " + getMaxSessionTimeout()
@@ -1282,4 +1309,16 @@ private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
void registerServerShutdownHandler(ZooKeeperServerShutdownHandler zkShutdownHandler) {
this.zkShutdownHandler = zkShutdownHandler;
}
+
+ public boolean isResponseCachingEnabled() {
+ return isResponseCachingEnabled;
+ }
+
+ public void setResponseCachingEnabled(boolean isEnabled) {
+ isResponseCachingEnabled = isEnabled;
+ }
+
+ public ResponseCache getReadResponseCache() {
+ return isResponseCachingEnabled ? readResponseCache : null;
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
index cf84b2f9e5..deae98d9b4 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerBean.java
@@ -197,4 +197,14 @@ public int getMinClientResponseSize() {
public int getMaxClientResponseSize() {
return zks.serverStats().getClientResponseStats().getMaxBufferSize();
}
+
+ @Override
+ public boolean getResponseCachingEnabled() {
+ return zks.isResponseCachingEnabled();
+ }
+
+ @Override
+ public void setResponseCachingEnabled(boolean isEnabled) {
+ zks.setResponseCachingEnabled(isEnabled);
+ }
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
index feb6875870..bd4d3498d2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServerMXBean.java
@@ -95,6 +95,9 @@
*/
public void setMaxSessionTimeout(int max);
+ public boolean getResponseCachingEnabled();
+ public void setResponseCachingEnabled(boolean isEnabled);
+
/**
* Reset packet and latency statistics
*/
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
index 20cf36dc88..a8fdeaf7dd 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MockServerCnxn.java
@@ -24,6 +24,7 @@
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.data.Stat;
public class MockServerCnxn extends ServerCnxn {
public Certificate[] clientChain;
@@ -43,7 +44,7 @@ void close() {
}
@Override
- public void sendResponse(ReplyHeader h, Record r, String tag)
+ public void sendResponse(ReplyHeader h, Record r, String tag, String cacheKey, Stat stat)
throws IOException {
}
@@ -80,7 +81,7 @@ public void setClientCertificateChain(Certificate[] chain) {
}
@Override
- void sendBuffer(ByteBuffer closeConn) {
+ void sendBuffer(ByteBuffer... closeConn) {
}
@Override
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
index 4edcc0eb12..d8a923a865 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/JMXEnv.java
@@ -19,9 +19,12 @@
package org.apache.zookeeper.test;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.regex.Pattern;
import javax.management.MBeanServer;
import javax.management.MBeanServerConnection;
@@ -319,4 +322,46 @@ private static boolean compare(String bean, String name) {
}
return false;
}
+
+ static Pattern standaloneRegEx = Pattern.compile(
+ "^org.apache.ZooKeeperService:name0=StandaloneServer_port-?\\d+$"
+ );
+ static Pattern instanceRegEx = Pattern.compile(
+ "^org.apache.ZooKeeperService:name0=ReplicatedServer_id(\\d+)" +
+ ",name1=replica.(\\d+),name2=(Follower|Leader)$"
+ );
+ static Pattern observerRegEx = Pattern.compile(
+ "^org.apache.ZooKeeperService:name0=ReplicatedServer_id(-?\\d+)" +
+ ",name1=replica.(-?\\d+),name2=(StandaloneServer_port-?\\d+)$"
+ );
+ static List<Pattern> beanPatterns = Arrays.asList(standaloneRegEx, instanceRegEx, observerRegEx);
+
+ public static List<ObjectName> getServerBeans() throws IOException {
+ ArrayList<ObjectName> serverBeans = new ArrayList<>();
+ Set<ObjectName> beans;
+ try {
+ beans = conn().queryNames(
+ new ObjectName(CommonNames.DOMAIN + ":*"), null);
+ } catch (MalformedObjectNameException e) {
+ throw new RuntimeException(e);
+ }
+ for (ObjectName bean : beans) {
+ String name = bean.toString();
+ LOG.info("bean:" + name);
+ for (Pattern pattern : beanPatterns) {
+ if (pattern.matcher(name).find()) {
+ serverBeans.add(bean);
+ }
+ }
+ }
+ return serverBeans;
+ }
+
+ public static ObjectName getServerBean() throws Exception {
+ List<ObjectName> serverBeans = getServerBeans();
+ if (serverBeans.size() != 1) {
+ throw new RuntimeException("Unable to find one and only one server bean");
+ }
+ return serverBeans.get(0);
+ }
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ResponseCacheTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ResponseCacheTest.java
new file mode 100644
index 0000000000..e220c61828
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ResponseCacheTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.Map;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ResponseCacheTest extends ClientBase {
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(ResponseCacheTest.class);
+
+ @Test
+ public void testResponseCache() throws Exception {
+ ZooKeeper zk = createClient();
+
+ try {
+ performCacheTest(zk, "/cache", true);
+ performCacheTest(zk, "/nocache", false);
+ }
+ finally {
+ zk.close();
+ }
+ }
+
+ private void checkCacheStatus(long expectedHits, long expectedMisses) {
+ Map<String, Object> metrics = ServerMetrics.getAllValues();
+ Assert.assertEquals(expectedHits, metrics.get("response_packet_cache_hits"));
+ Assert.assertEquals(expectedMisses, metrics.get("response_packet_cache_misses"));
+ }
+
+ public void performCacheTest(ZooKeeper zk, String path, boolean useCache) throws Exception {
+ ServerMetrics.resetAll();
+ Stat writeStat = new Stat();
+ Stat readStat = new Stat();
+ byte[] readData = null;
+ int reads = 10;
+ long expectedHits = 0;
+ long expectedMisses = 0;
+
+ getServer(serverFactory).setResponseCachingEnabled(useCache);
+ LOG.info("caching: {}", useCache);
+
+ byte[] writeData = "test1".getBytes();
+ zk.create(path, writeData, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT, writeStat);
+ for (int i = 0; i < reads; ++i) {
+ readData = zk.getData(path, false, readStat);
+ Assert.assertArrayEquals(writeData, readData);
+ Assert.assertEquals(writeStat, readStat);
+ }
+ if (useCache) {
+ expectedMisses += 1;
+ expectedHits += reads - 1;
+ }
+ checkCacheStatus(expectedHits, expectedMisses);
+
+ writeData = "test2".getBytes();
+ writeStat = zk.setData(path, writeData, -1);
+ for (int i = 0; i < 10; ++i) {
+ readData = zk.getData(path, false, readStat);
+ Assert.assertArrayEquals(writeData, readData);
+ Assert.assertEquals(writeStat, readStat);
+ }
+ if (useCache) {
+ expectedMisses += 1;
+ expectedHits += reads - 1;
+ }
+ checkCacheStatus(expectedHits, expectedMisses);
+
+ // Create a child beneath the tested node. This won't change the data of
+ // the tested node, but will change it's pzxid. The next read of the tested
+ // node should miss in the cache. The data should still match what was written
+ // before, but the stat information should not.
+ zk.create(path + "/child", "child".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT, null);
+ readData = zk.getData(path, false, readStat);
+ if (useCache) {
+ expectedMisses++;
+ }
+ Assert.assertArrayEquals(writeData, readData);
+ Assert.assertNotSame(writeStat, readStat);
+ checkCacheStatus(expectedHits, expectedMisses);
+ }
+}
With regards,
Apache Git Services