You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by sy...@apache.org on 2022/10/05 15:56:15 UTC

[zookeeper] branch master updated: ZOOKEEPER-4575: ZooKeeperServer#processPacket take record instead of bytes

This is an automated email from the ASF dual-hosted git repository.

symat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 3daefac37 ZOOKEEPER-4575: ZooKeeperServer#processPacket take record instead of bytes
3daefac37 is described below

commit 3daefac37e8a7b456542c91adea541a938df1214
Author: tison <wa...@gmail.com>
AuthorDate: Wed Oct 5 17:55:54 2022 +0200

    ZOOKEEPER-4575: ZooKeeperServer#processPacket take record instead of bytes
    
    This is the first step mentioned in [ZOOKEEPER-102](https://issues.apache.org/jira/browse/ZOOKEEPER-102) and we should not play with ByteBuffer among the request handling code path.
    
    Author: tison <wa...@gmail.com>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>, Mate Szalay-Beko <sy...@apache.org>
    
    Closes #1905 from tisonkun/request-supplier
---
 .travis.yml                                        | 20 ++---
 .../apache/zookeeper/DeleteContainerRequest.java   | 51 ++++++++++++
 .../org/apache/zookeeper/audit/AuditHelper.java    | 22 ++---
 .../zookeeper/server/ByteBufferRequestRecord.java  | 64 ++++++++++++++
 .../apache/zookeeper/server/ContainerManager.java  |  7 +-
 .../zookeeper/server/FinalRequestProcessor.java    | 54 ++++--------
 .../org/apache/zookeeper/server/NIOServerCnxn.java |  6 +-
 .../apache/zookeeper/server/NettyServerCnxn.java   |  8 +-
 .../zookeeper/server/PrepRequestProcessor.java     | 97 ++++++++--------------
 .../java/org/apache/zookeeper/server/Request.java  | 62 ++++++++------
 .../LearnerSyncRequest.java => RequestRecord.java} | 35 +++++---
 .../zookeeper/server/SimpleRequestRecord.java      | 68 +++++++++++++++
 .../apache/zookeeper/server/ZooKeeperServer.java   | 75 ++++++-----------
 .../apache/zookeeper/server/quorum/Learner.java    |  4 +-
 .../zookeeper/server/quorum/LearnerHandler.java    |  5 +-
 .../server/quorum/LearnerSyncRequest.java          |  6 +-
 .../server/quorum/QuorumZooKeeperServer.java       | 14 ++--
 .../zookeeper/audit/Slf4JAuditLoggerTest.java      |  6 +-
 .../zookeeper/server/CreateContainerTest.java      | 26 +++---
 .../server/FinalRequestProcessorTest.java          | 12 +--
 .../server/MultiOpSessionUpgradeTest.java          |  4 +-
 .../server/PrepRequestProcessorMetricsTest.java    | 10 +--
 .../zookeeper/server/PrepRequestProcessorTest.java | 27 ++----
 .../server/ZooKeeperCriticalThreadMetricsTest.java |  3 +-
 .../quorum/CommitProcessorConcurrencyTest.java     |  3 +-
 .../server/quorum/CommitProcessorMetricsTest.java  |  6 +-
 .../server/quorum/CommitProcessorTest.java         |  5 +-
 .../zookeeper/server/quorum/RaceConditionTest.java |  6 +-
 .../server/quorum/SessionUpgradeQuorumTest.java    |  6 +-
 .../quorum/SyncRequestProcessorMetricTest.java     |  4 +-
 .../zookeeper/test/LeaderSessionTrackerTest.java   |  5 +-
 31 files changed, 415 insertions(+), 306 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index 81471d9ed..7fd2dcd02 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,15 +5,14 @@ matrix:
     - os: linux
       arch: arm64
       jdk: openjdk11
+      install: |
+        f=$(which javac)
+        while [[ -L $f ]]; do f=$(readlink $f); done
+        export JAVA_HOME=${f%/bin/*}
+
     - os: linux
       arch: s390x
       jdk: openjdk11
-      addons:
-        apt:
-          update: true
-          packages:
-            - maven
-            - libcppunit-dev
 
 cache:
   directories:
@@ -21,13 +20,10 @@ cache:
 
 addons:
   apt:
+    update: true
     packages:
-    - libcppunit-dev
-
-install:
-  - if [ "${TRAVIS_CPU_ARCH}" == "arm64" ]; then
-     sudo apt-get install maven;
-    fi
+      - maven
+      - libcppunit-dev
 
 script: mvn clean apache-rat:check verify -DskipTests spotbugs:check checkstyle:check -Pfull-build
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java
new file mode 100644
index 000000000..b7fd12689
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/DeleteContainerRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
+
+public class DeleteContainerRequest implements Record {
+    private String path;
+
+    public DeleteContainerRequest() {
+    }
+
+    public DeleteContainerRequest(String path) {
+        this.path = path;
+    }
+
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public void serialize(OutputArchive archive, String tag) throws IOException {
+        archive.writeBuffer(path.getBytes(StandardCharsets.UTF_8), "path");
+    }
+
+    @Override
+    public void deserialize(InputArchive archive, String tag) throws IOException {
+        byte[] bytes = archive.readBuffer("path");
+        path = new String(bytes, StandardCharsets.UTF_8);
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
index 5aca1711f..d6df7d924 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
@@ -20,7 +20,6 @@ package org.apache.zookeeper.audit;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.jute.Record;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.MultiOperationRecord;
@@ -58,7 +57,7 @@ public final class AuditHelper {
         if (!ZKAuditProvider.isAuditEnabled()) {
             return;
         }
-        String op = null;
+        String op;
         //For failed transaction rc.path is null
         String path = txnResult.path;
         String acls = null;
@@ -69,8 +68,7 @@ public final class AuditHelper {
                 case ZooDefs.OpCode.create2:
                 case ZooDefs.OpCode.createContainer:
                     op = AuditConstants.OP_CREATE;
-                    CreateRequest createRequest = new CreateRequest();
-                    deserialize(request, createRequest);
+                    CreateRequest createRequest = request.readRequestRecord(CreateRequest::new);
                     createMode = getCreateMode(createRequest);
                     if (failedTxn) {
                         path = createRequest.getPath();
@@ -80,23 +78,20 @@ public final class AuditHelper {
                 case ZooDefs.OpCode.deleteContainer:
                     op = AuditConstants.OP_DELETE;
                     if (failedTxn) {
-                        DeleteRequest deleteRequest = new DeleteRequest();
-                        deserialize(request, deleteRequest);
+                        DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
                         path = deleteRequest.getPath();
                     }
                     break;
                 case ZooDefs.OpCode.setData:
                     op = AuditConstants.OP_SETDATA;
                     if (failedTxn) {
-                        SetDataRequest setDataRequest = new SetDataRequest();
-                        deserialize(request, setDataRequest);
+                        SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
                         path = setDataRequest.getPath();
                     }
                     break;
                 case ZooDefs.OpCode.setACL:
                     op = AuditConstants.OP_SETACL;
-                    SetACLRequest setACLRequest = new SetACLRequest();
-                    deserialize(request, setACLRequest);
+                    SetACLRequest setACLRequest = request.readRequestRecord(SetACLRequest::new);
                     acls = ZKUtil.aclToString(setACLRequest.getAcl());
                     if (failedTxn) {
                         path = setACLRequest.getPath();
@@ -125,10 +120,6 @@ public final class AuditHelper {
         }
     }
 
-    private static void deserialize(Request request, Record record) throws IOException {
-        request.readRequestRecord(record);
-    }
-
     private static Result getResult(ProcessTxnResult rc, boolean failedTxn) {
         if (failedTxn) {
             return Result.FAILURE;
@@ -191,8 +182,7 @@ public final class AuditHelper {
         if (!ZKAuditProvider.isAuditEnabled()) {
             return createModes;
         }
-        MultiOperationRecord multiRequest = new MultiOperationRecord();
-        deserialize(request, multiRequest);
+        MultiOperationRecord multiRequest = request.readRequestRecord(MultiOperationRecord::new);
         for (Op op : multiRequest) {
             if (op.getType() == ZooDefs.OpCode.create || op.getType() == ZooDefs.OpCode.create2
                     || op.getType() == ZooDefs.OpCode.createContainer) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java
new file mode 100644
index 000000000..5ddae2460
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferRequestRecord.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *uuuuu
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+import org.apache.jute.Record;
+
+public class ByteBufferRequestRecord implements RequestRecord {
+
+    private final ByteBuffer request;
+
+    private volatile Record record;
+
+    public ByteBufferRequestRecord(ByteBuffer request) {
+        this.request = request;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends Record> T readRecord(Supplier<T> constructor) throws IOException {
+        if (record != null) {
+            return (T) record;
+        }
+
+        record = constructor.get();
+        request.rewind();
+        ByteBufferInputStream.byteBuffer2Record(request, record);
+        request.rewind();
+        return (T) record;
+    }
+
+    @Override
+    public byte[] readBytes() {
+        request.rewind();
+        int len = request.remaining();
+        byte[] b = new byte[len];
+        request.get(b);
+        request.rewind();
+        return b;
+    }
+
+    @Override
+    public int limit() {
+        return request.limit();
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java
index 7abac587a..2664348c2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ContainerManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.zookeeper.server;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
@@ -27,6 +25,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import org.apache.zookeeper.DeleteContainerRequest;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.common.Time;
 import org.slf4j.Logger;
@@ -129,8 +128,8 @@ public class ContainerManager {
         for (String containerPath : getCandidates()) {
             long startMs = Time.currentElapsedTime();
 
-            ByteBuffer path = ByteBuffer.wrap(containerPath.getBytes(UTF_8));
-            Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, path, null);
+            DeleteContainerRequest record = new DeleteContainerRequest(containerPath);
+            Request request = new Request(null, 0, 0, ZooDefs.OpCode.deleteContainer, RequestRecord.fromRecord(record), null);
             try {
                 LOG.info("Attempting to delete candidate container: {}", containerPath);
                 postDeleteRequest(request);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
index bc5b019f8..693a6b86e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
@@ -269,8 +269,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.multiRead: {
                 lastOp = "MLTR";
-                MultiOperationRecord multiReadRecord = new MultiOperationRecord();
-                request.readRequestRecord(multiReadRecord);
+                MultiOperationRecord multiReadRecord = request.readRequestRecord(MultiOperationRecord::new);
                 rsp = new MultiResponse();
                 OpResult subResult;
                 for (Op readOp : multiReadRecord) {
@@ -348,8 +347,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.sync: {
                 lastOp = "SYNC";
-                SyncRequest syncRequest = new SyncRequest();
-                request.readRequestRecord(syncRequest);
+                SyncRequest syncRequest = request.readRequestRecord(SyncRequest::new);
                 rsp = new SyncResponse(syncRequest.getPath());
                 requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
                 break;
@@ -363,8 +361,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.exists: {
                 lastOp = "EXIS";
                 // TODO we need to figure out the security requirement for this!
-                ExistsRequest existsRequest = new ExistsRequest();
-                request.readRequestRecord(existsRequest);
+                ExistsRequest existsRequest = request.readRequestRecord(ExistsRequest::new);
                 path = existsRequest.getPath();
                 if (path.indexOf('\0') != -1) {
                     throw new KeeperException.BadArgumentsException();
@@ -376,8 +373,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.getData: {
                 lastOp = "GETD";
-                GetDataRequest getDataRequest = new GetDataRequest();
-                request.readRequestRecord(getDataRequest);
+                GetDataRequest getDataRequest = request.readRequestRecord(GetDataRequest::new);
                 path = getDataRequest.getPath();
                 rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
                 requestPathMetricsCollector.registerRequest(request.type, path);
@@ -385,8 +381,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.setWatches: {
                 lastOp = "SETW";
-                SetWatches setWatches = new SetWatches();
-                request.readRequestRecord(setWatches);
+                SetWatches setWatches = request.readRequestRecord(SetWatches::new);
                 long relativeZxid = setWatches.getRelativeZxid();
                 zks.getZKDatabase()
                    .setWatches(
@@ -401,8 +396,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.setWatches2: {
                 lastOp = "STW2";
-                SetWatches2 setWatches = new SetWatches2();
-                request.readRequestRecord(setWatches);
+                SetWatches2 setWatches = request.readRequestRecord(SetWatches2::new);
                 long relativeZxid = setWatches.getRelativeZxid();
                 zks.getZKDatabase().setWatches(relativeZxid,
                         setWatches.getDataWatches(),
@@ -415,16 +409,14 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.addWatch: {
                 lastOp = "ADDW";
-                AddWatchRequest addWatcherRequest = new AddWatchRequest();
-                request.readRequestRecord(addWatcherRequest);
+                AddWatchRequest addWatcherRequest = request.readRequestRecord(AddWatchRequest::new);
                 zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
                 rsp = new ErrorResponse(0);
                 break;
             }
             case OpCode.getACL: {
                 lastOp = "GETA";
-                GetACLRequest getACLRequest = new GetACLRequest();
-                request.readRequestRecord(getACLRequest);
+                GetACLRequest getACLRequest = request.readRequestRecord(GetACLRequest::new);
                 path = getACLRequest.getPath();
                 DataNode n = zks.getZKDatabase().getNode(path);
                 if (n == null) {
@@ -466,8 +458,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.getChildren: {
                 lastOp = "GETC";
-                GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
-                request.readRequestRecord(getChildrenRequest);
+                GetChildrenRequest getChildrenRequest = request.readRequestRecord(GetChildrenRequest::new);
                 path = getChildrenRequest.getPath();
                 rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
                 requestPathMetricsCollector.registerRequest(request.type, path);
@@ -475,8 +466,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.getAllChildrenNumber: {
                 lastOp = "GETACN";
-                GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
-                request.readRequestRecord(getAllChildrenNumberRequest);
+                GetAllChildrenNumberRequest getAllChildrenNumberRequest = request.readRequestRecord(GetAllChildrenNumberRequest::new);
                 path = getAllChildrenNumberRequest.getPath();
                 DataNode n = zks.getZKDatabase().getNode(path);
                 if (n == null) {
@@ -495,8 +485,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.getChildren2: {
                 lastOp = "GETC";
-                GetChildren2Request getChildren2Request = new GetChildren2Request();
-                request.readRequestRecord(getChildren2Request);
+                GetChildren2Request getChildren2Request = request.readRequestRecord(GetChildren2Request::new);
                 Stat stat = new Stat();
                 path = getChildren2Request.getPath();
                 DataNode n = zks.getZKDatabase().getNode(path);
@@ -517,8 +506,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.checkWatches: {
                 lastOp = "CHKW";
-                CheckWatchesRequest checkWatches = new CheckWatchesRequest();
-                request.readRequestRecord(checkWatches);
+                CheckWatchesRequest checkWatches = request.readRequestRecord(CheckWatchesRequest::new);
                 WatcherType type = WatcherType.fromInt(checkWatches.getType());
                 path = checkWatches.getPath();
                 boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
@@ -531,8 +519,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.removeWatches: {
                 lastOp = "REMW";
-                RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
-                request.readRequestRecord(removeWatches);
+                RemoveWatchesRequest removeWatches = request.readRequestRecord(RemoveWatchesRequest::new);
                 WatcherType type = WatcherType.fromInt(removeWatches.getType());
                 path = removeWatches.getPath();
                 boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
@@ -550,8 +537,7 @@ public class FinalRequestProcessor implements RequestProcessor {
              }
             case OpCode.getEphemerals: {
                 lastOp = "GETE";
-                GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
-                request.readRequestRecord(getEphemerals);
+                GetEphemeralsRequest getEphemerals = request.readRequestRecord(GetEphemeralsRequest::new);
                 String prefixPath = getEphemerals.getPrefixPath();
                 Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
                 List<String> ephemerals = new ArrayList<>();
@@ -585,16 +571,8 @@ public class FinalRequestProcessor implements RequestProcessor {
             // log at error level as we are returning a marshalling
             // error to the user
             LOG.error("Failed to process {}", request, e);
-            StringBuilder sb = new StringBuilder();
-            byte[] payload = request.readRequestBytes();
-            if (payload != null) {
-                for (byte b : payload) {
-                    sb.append(String.format("%02x", (0xff & b)));
-                }
-            } else {
-                sb.append("request buffer is null");
-            }
-            LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
+            String digest = request.requestDigest();
+            LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
             err = Code.MARSHALLINGERROR;
         }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
index 83e7491e5..5ffc81da1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
@@ -42,6 +42,7 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
 import org.apache.zookeeper.server.command.CommandExecutor;
@@ -392,7 +393,10 @@ public class NIOServerCnxn extends ServerCnxn {
     }
 
     protected void readRequest() throws IOException {
-        zkServer.processPacket(this, incomingBuffer);
+        RequestHeader h = new RequestHeader();
+        ByteBufferInputStream.byteBuffer2Record(incomingBuffer, h);
+        RequestRecord request = RequestRecord.fromBytes(incomingBuffer.slice());
+        zkServer.processPacket(this, h, request);
     }
 
     // returns whether we are interested in writing, which is determined
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index ae482ce2b..f95200d56 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -47,6 +47,7 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.command.CommandExecutor;
 import org.apache.zookeeper.server.command.FourLetterCommands;
@@ -478,9 +479,10 @@ public class NettyServerCnxn extends ServerCnxn {
                             throw new IOException("ZK down");
                         }
                         if (initialized) {
-                            // TODO: if zks.processPacket() is changed to take a ByteBuffer[],
-                            // we could implement zero-copy queueing.
-                            zks.processPacket(this, bb);
+                            RequestHeader h = new RequestHeader();
+                            ByteBufferInputStream.byteBuffer2Record(bb, h);
+                            RequestRecord request = RequestRecord.fromBytes(bb.slice());
+                            zks.processPacket(this, h, request);
                         } else {
                             LOG.debug("got conn req request from {}", getRemoteSocketAddress());
                             BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
index 9733a48ac..35293359f 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.zookeeper.server;
 
-import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.StringReader;
@@ -36,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.DeleteContainerRequest;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.BadArgumentsException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -101,7 +101,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
      */
     private static boolean failCreate = false;
 
-    LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
+    LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<>();
 
     private final RequestProcessor nextProcessor;
     private final boolean digestEnabled;
@@ -311,13 +311,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
     /**
      * This method will be called inside the ProcessRequestThread, which is a
      * singleton, so there will be a single thread calling this code.
-     *
-     * @param type
-     * @param zxid
-     * @param request
-     * @param record
      */
-    protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
+    protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException, IOException, RequestProcessorException {
         if (request.getHdr() == null) {
             request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
                     Time.currentWallTime(), type));
@@ -328,11 +323,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         case OpCode.create2:
         case OpCode.createTTL:
         case OpCode.createContainer: {
-            pRequest2TxnCreate(type, request, record, deserialize);
+            pRequest2TxnCreate(type, request, record);
             break;
         }
         case OpCode.deleteContainer: {
-            String path = new String(request.readRequestBytes(), UTF_8);
+            DeleteContainerRequest txn = (DeleteContainerRequest) record;
+            String path = txn.getPath();
             String parentPath = getParentPathAndValidate(path);
             ChangeRecord nodeRecord = getRecordForPath(path);
             if (nodeRecord.childCount > 0) {
@@ -359,9 +355,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         case OpCode.delete:
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             DeleteRequest deleteRequest = (DeleteRequest) record;
-            if (deserialize) {
-                request.readRequestRecord(deleteRequest);
-            }
             String path = deleteRequest.getPath();
             String parentPath = getParentPathAndValidate(path);
             ChangeRecord parentRecord = getRecordForPath(parentPath);
@@ -387,9 +380,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         case OpCode.setData:
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             SetDataRequest setDataRequest = (SetDataRequest) record;
-            if (deserialize) {
-                request.readRequestRecord(setDataRequest);
-            }
             path = setDataRequest.getPath();
             validatePath(path, request.sessionId);
             nodeRecord = getRecordForPath(path);
@@ -559,9 +549,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         case OpCode.setACL:
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             SetACLRequest setAclRequest = (SetACLRequest) record;
-            if (deserialize) {
-                request.readRequestRecord(setAclRequest);
-            }
             path = setAclRequest.getPath();
             validatePath(path, request.sessionId);
             List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
@@ -577,8 +564,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             addChangeRecord(nodeRecord);
             break;
         case OpCode.createSession:
-            CreateSessionTxn createSessionTxn = new CreateSessionTxn();
-            request.readRequestRecord(createSessionTxn);
+            CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
             request.setTxn(createSessionTxn);
             // only add the global session tracker but not to ZKDb
             zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
@@ -630,9 +616,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         case OpCode.check:
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
-            if (deserialize) {
-                request.readRequestRecord(checkVersionRequest);
-            }
             path = checkVersionRequest.getPath();
             validatePath(path, request.sessionId);
             nodeRecord = getRecordForPath(path);
@@ -653,11 +636,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
         }
     }
 
-    private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
-        if (deserialize) {
-            request.readRequestRecord(record);
-        }
-
+    private void pRequest2TxnCreate(int type, Request request, Record record) throws IOException, KeeperException {
         int flags;
         String path;
         List<ACL> acl;
@@ -792,39 +771,41 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             case OpCode.createContainer:
             case OpCode.create:
             case OpCode.create2:
-                CreateRequest create2Request = new CreateRequest();
-                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
+                CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
                 break;
             case OpCode.createTTL:
-                CreateTTLRequest createTtlRequest = new CreateTTLRequest();
-                pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
+                CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
                 break;
             case OpCode.deleteContainer:
+                DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
+                break;
             case OpCode.delete:
-                DeleteRequest deleteRequest = new DeleteRequest();
-                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
+                DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
                 break;
             case OpCode.setData:
-                SetDataRequest setDataRequest = new SetDataRequest();
-                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
+                SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
                 break;
             case OpCode.reconfig:
-                ReconfigRequest reconfigRequest = new ReconfigRequest();
-                request.readRequestRecord(reconfigRequest);
-                pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
+                ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
                 break;
             case OpCode.setACL:
-                SetACLRequest setAclRequest = new SetACLRequest();
-                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
+                SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
                 break;
             case OpCode.check:
-                CheckVersionRequest checkRequest = new CheckVersionRequest();
-                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
+                CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
+                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
                 break;
             case OpCode.multi:
-                MultiOperationRecord multiRequest = new MultiOperationRecord();
+                MultiOperationRecord multiRequest;
                 try {
-                    request.readRequestRecord(multiRequest);
+                    multiRequest = request.readRequestRecord(MultiOperationRecord::new);
                 } catch (IOException e) {
                     request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
                     throw e;
@@ -854,7 +835,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                     } else {
                         /* Prep the request and convert to a Txn */
                         try {
-                            pRequest2Txn(op.getType(), zxid, request, subrequest, false);
+                            pRequest2Txn(op.getType(), zxid, request, subrequest);
                             type = op.getType();
                             txn = request.getTxn();
                         } catch (KeeperException e) {
@@ -899,7 +880,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             case OpCode.createSession:
             case OpCode.closeSession:
                 if (!request.isLocalSession()) {
-                    pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
+                    pRequest2Txn(request.type, zks.getNextZxid(), request, null);
                 }
                 break;
 
@@ -944,20 +925,14 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             // log at error level as we are returning a marshalling
             // error to the user
             LOG.error("Failed to process {}", request, e);
-            StringBuilder sb = new StringBuilder();
-            byte[] payload = request.readRequestBytes();
-            if (payload != null) {
-                for (byte b : payload) {
-                    sb.append(String.format("%02x", (0xff & b)));
-                }
-            } else {
-                sb.append("request buffer is null");
-            }
-            LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
-            if (request.getHdr() != null) {
-                request.getHdr().setType(OpCode.error);
-                request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
+            String digest = request.requestDigest();
+            LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
+            if (request.getHdr() == null) {
+                request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type));
             }
+
+            request.getHdr().setType(OpCode.error);
+            request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
         }
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
index 1aee6aee2..86a50fc55 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
@@ -22,6 +22,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.function.Supplier;
 import org.apache.jute.Record;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
@@ -52,12 +53,12 @@ public class Request {
     // associated session timeout. Disabled by default.
     private static volatile boolean staleLatencyCheck = Boolean.parseBoolean(System.getProperty("zookeeper.request_stale_latency_check", "false"));
 
-    public Request(ServerCnxn cnxn, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
+    public Request(ServerCnxn cnxn, long sessionId, int xid, int type, RequestRecord request, List<Id> authInfo) {
         this.cnxn = cnxn;
         this.sessionId = sessionId;
         this.cxid = xid;
         this.type = type;
-        this.request = bb;
+        this.request = request;
         this.authInfo = authInfo;
     }
 
@@ -79,30 +80,42 @@ public class Request {
 
     public final int type;
 
-    private final ByteBuffer request;
+    private final RequestRecord request;
 
-    public void readRequestRecord(Record record) throws IOException {
+    public <T extends Record> T readRequestRecord(Supplier<T> constructor) throws IOException {
         if (request != null) {
-            request.rewind();
-            ByteBufferInputStream.byteBuffer2Record(request, record);
-            request.rewind();
-            return;
+            return request.readRecord(constructor);
         }
         throw new IOException(new NullPointerException("request"));
     }
 
+    public <T extends Record> T readRequestRecordNoException(Supplier<T> constructor) {
+        try {
+            return readRequestRecord(constructor);
+        } catch (IOException e) {
+            return null;
+        }
+    }
+
     public byte[] readRequestBytes() {
         if (request != null) {
-            request.rewind();
-            int len = request.remaining();
-            byte[] b = new byte[len];
-            request.get(b);
-            request.rewind();
-            return b;
+            return request.readBytes();
         }
         return null;
     }
 
+    public String requestDigest() {
+        if (request != null) {
+            final StringBuilder sb = new StringBuilder();
+            final byte[] payload = request.readBytes();
+            for (byte b : payload) {
+                sb.append(String.format("%02x", (0xff & b)));
+            }
+            return sb.toString();
+        }
+        return "request buffer is null";
+    }
+
     public final ServerCnxn cnxn;
 
     private TxnHeader hdr;
@@ -423,18 +436,19 @@ public class Request {
             && type != OpCode.setWatches
             && type != OpCode.setWatches2
             && type != OpCode.closeSession
-            && request != null
-            && request.remaining() >= 4) {
+            && request != null) {
             try {
                 // make sure we don't mess with request itself
-                ByteBuffer rbuf = request.asReadOnlyBuffer();
-                rbuf.clear();
-                int pathLen = rbuf.getInt();
-                // sanity check
-                if (pathLen >= 0 && pathLen < 4096 && rbuf.remaining() >= pathLen) {
-                    byte[] b = new byte[pathLen];
-                    rbuf.get(b);
-                    path = new String(b, UTF_8);
+                byte[] bytes = request.readBytes();
+                if (bytes != null && bytes.length >= 4) {
+                    ByteBuffer buf = ByteBuffer.wrap(bytes);
+                    int pathLen = buf.getInt();
+                    // sanity check
+                    if (pathLen >= 0 && pathLen < 4096 && buf.remaining() >= pathLen) {
+                        byte[] b = new byte[pathLen];
+                        buf.get(b);
+                        path = new String(b, UTF_8);
+                    }
                 }
             } catch (Exception e) {
                 // ignore - can't find the path, will output "n/a" instead
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java
similarity index 52%
copy from zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
copy to zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java
index d4c83aeab..6265f168e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/RequestRecord.java
@@ -8,28 +8,39 @@
  * with the License.  You may obtain a copy of the License at
  *
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ *uuuuu
  * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
 
-package org.apache.zookeeper.server.quorum;
+package org.apache.zookeeper.server;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.server.Request;
+import java.util.function.Supplier;
+import org.apache.jute.Record;
+
+public interface RequestRecord {
+
+    static RequestRecord fromBytes(ByteBuffer buffer) {
+        return new ByteBufferRequestRecord(buffer);
+    }
 
-public class LearnerSyncRequest extends Request {
+    static RequestRecord fromBytes(byte[] bytes) {
+        return fromBytes(ByteBuffer.wrap(bytes));
+    }
 
-    LearnerHandler fh;
-    public LearnerSyncRequest(
-        LearnerHandler fh, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
-        super(null, sessionId, xid, type, bb, authInfo);
-        this.fh = fh;
+    static RequestRecord fromRecord(Record record) {
+        return new SimpleRequestRecord(record);
     }
 
+    <T extends Record> T readRecord(Supplier<T> clazz) throws IOException;
+
+    byte[] readBytes();
+
+    int limit();
+
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java
new file mode 100644
index 000000000..a1c78ddad
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/SimpleRequestRecord.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *uuuuu
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.util.function.Supplier;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+
+public class SimpleRequestRecord implements RequestRecord {
+
+    private final Record record;
+
+    private volatile byte[] bytes;
+
+    public SimpleRequestRecord(Record record) {
+        this.record = record;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public <T extends Record> T readRecord(Supplier<T> constructor) {
+        return (T) record;
+    }
+
+    @SuppressFBWarnings("EI_EXPOSE_REP")
+    @Override
+    public byte[] readBytes() {
+        if (bytes != null) {
+            return bytes;
+        }
+
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+            record.serialize(boa, "request");
+            bytes = baos.toByteArray();
+            return bytes;
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    @Override
+    public int limit() {
+        byte[] bytes = readBytes();
+        return ByteBuffer.wrap(bytes).limit();
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 817e84b3e..f6c2b93eb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -22,7 +22,6 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -39,7 +38,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
 import javax.security.sasl.SaslException;
-import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.Environment;
@@ -276,7 +274,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     // Connection throttling
-    private BlueThrottle connThrottle = new BlueThrottle();
+    private final BlueThrottle connThrottle = new BlueThrottle();
 
     @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification =
         "Internally the throttler has a BlockingQueue so "
@@ -290,17 +288,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * too many large requests such that the JVM runs out of usable heap and
      * ultimately crashes.
      *
-     * The limit is enforced by the {@link checkRequestSize(int, boolean)}
+     * The limit is enforced by the {@link #checkRequestSizeWhenReceivingMessage(int)}
      * method which is called by the connection layer ({@link NIOServerCnxn},
      * {@link NettyServerCnxn}) before allocating a byte buffer and pulling
      * data off the TCP socket. The limit is then checked again by the
-     * ZooKeeper server in {@link processPacket(ServerCnxn, ByteBuffer)} which
-     * also atomically updates {@link currentLargeRequestBytes}. The request is
+     * ZooKeeper server in {@link #processPacket(ServerCnxn, RequestHeader, RequestRecord)} which
+     * also atomically updates {@link #currentLargeRequestBytes}. The request is
      * then marked as a large request, with the request size stored in the Request
-     * object so that it can later be decremented from {@link currentLargeRequestsBytes}.
+     * object so that it can later be decremented from {@link #currentLargeRequestBytes}.
      *
      * When a request is completed or dropped, the relevant code path calls the
-     * {@link requestFinished(Request)} method which performs the decrement if
+     * {@link #requestFinished(Request)} method which performs the decrement if
      * needed.
      */
     private volatile int largeRequestMaxBytes = 100 * 1024 * 1024;
@@ -313,7 +311,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
     private final AtomicInteger currentLargeRequestBytes = new AtomicInteger(0);
 
-    private AuthenticationHelper authHelper;
+    private final AuthenticationHelper authHelper = new AuthenticationHelper();
 
     void removeCnxn(ServerCnxn cnxn) {
         zkDb.removeCnxn(cnxn);
@@ -329,7 +327,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         listener = new ZooKeeperServerListenerImpl(this);
         serverStats = new ServerStats(this);
         this.requestPathMetricsCollector = new RequestPathMetricsCollector();
-        this.authHelper = new AuthenticationHelper();
     }
 
     /**
@@ -371,8 +368,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
         this.initLargeRequestThrottlingSettings();
 
-        this.authHelper = new AuthenticationHelper();
-
         LOG.info(
             "Created server with"
                 + " tickTime {} ms"
@@ -1015,10 +1010,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         long sessionId = sessionTracker.createSession(timeout);
         Random r = new Random(sessionId ^ superSecret);
         r.nextBytes(passwd);
-        ByteBuffer to = ByteBuffer.allocate(4);
-        to.putInt(timeout);
+        CreateSessionTxn txn = new CreateSessionTxn(timeout);
         cnxn.setSessionId(sessionId);
-        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
+        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
         submitRequest(si);
         return sessionId;
     }
@@ -1595,13 +1589,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
-    public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
-        // We have the request, now process and setup for next
-        InputStream bais = new ByteBufferInputStream(incomingBuffer);
-        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
-        RequestHeader h = new RequestHeader();
-        h.deserialize(bia, "header");
-
+    public void processPacket(ServerCnxn cnxn, RequestHeader h, RequestRecord request) throws IOException {
         // Need to increase the outstanding request count first, otherwise
         // there might be a race condition that it enabled recv after
         // processing request and then disabled when check throttling.
@@ -1613,14 +1601,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         // in cnxn, since it will close the cnxn anyway.
         cnxn.incrOutstandingAndCheckThrottle(h);
 
-        // Through the magic of byte buffers, txn will not be
-        // pointing
-        // to the start of the txn
-        incomingBuffer = incomingBuffer.slice();
         if (h.getType() == OpCode.auth) {
             LOG.info("got auth packet {}", cnxn.getRemoteSocketAddress());
-            AuthPacket authPacket = new AuthPacket();
-            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
+            AuthPacket authPacket = request.readRecord(AuthPacket::new);
             String scheme = authPacket.getScheme();
             ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
             Code authReturn = KeeperException.Code.AUTHFAILED;
@@ -1660,15 +1643,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             }
             return;
         } else if (h.getType() == OpCode.sasl) {
-            processSasl(incomingBuffer, cnxn, h);
+            processSasl(request, cnxn, h);
         } else {
             if (!authHelper.enforceAuthentication(cnxn, h.getXid())) {
                 // Authentication enforcement is failed
                 // Already sent response to user about failure and closed the session, lets return
                 return;
             } else {
-                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
-                int length = incomingBuffer.limit();
+                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), request, cnxn.getAuthInfo());
+                int length = request.limit();
                 if (isLargeRequest(length)) {
                     // checkRequestSize will throw IOException if request is rejected
                     checkRequestSizeWhenMessageReceived(length);
@@ -1706,10 +1689,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return Boolean.getBoolean(ALLOW_SASL_FAILED_CLIENTS);
     }
 
-    private void processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
+    private void processSasl(RequestRecord request, ServerCnxn cnxn, RequestHeader requestHeader) throws IOException {
         LOG.debug("Responding to client SASL token.");
-        GetSASLRequest clientTokenRecord = new GetSASLRequest();
-        ByteBufferInputStream.byteBuffer2Record(incomingBuffer, clientTokenRecord);
+        GetSASLRequest clientTokenRecord = request.readRecord(GetSASLRequest::new);
         byte[] clientToken = clientTokenRecord.getToken();
         LOG.debug("Size of client SASL token: {}", clientToken.length);
         byte[] responseToken = null;
@@ -2180,8 +2162,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         switch (request.type) {
         case OpCode.create:
         case OpCode.create2: {
-            CreateRequest req = new CreateRequest();
-            if (readRequestRecord(request, req)) {
+            CreateRequest req = request.readRequestRecordNoException(CreateRequest::new);
+            if (req != null) {
                 mustCheckACL = true;
                 acl = req.getAcl();
                 path = parentPath(req.getPath());
@@ -2189,22 +2171,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             break;
         }
         case OpCode.delete: {
-            DeleteRequest req = new DeleteRequest();
-            if (readRequestRecord(request, req)) {
+            DeleteRequest req = request.readRequestRecordNoException(DeleteRequest::new);
+            if (req != null) {
                 path = parentPath(req.getPath());
             }
             break;
         }
         case OpCode.setData: {
-            SetDataRequest req = new SetDataRequest();
-            if (readRequestRecord(request, req)) {
+            SetDataRequest req = request.readRequestRecordNoException(SetDataRequest::new);
+            if (req != null) {
                 path = req.getPath();
             }
             break;
         }
         case OpCode.setACL: {
-            SetACLRequest req = new SetACLRequest();
-            if (readRequestRecord(request, req)) {
+            SetACLRequest req = request.readRequestRecordNoException(SetACLRequest::new);
+            if (req != null) {
                 mustCheckACL = true;
                 acl = req.getAcl();
                 path = req.getPath();
@@ -2298,15 +2280,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return err == KeeperException.Code.OK.intValue();
     }
 
-    private boolean readRequestRecord(Request request, Record record) {
-        try {
-            request.readRequestRecord(record);
-            return true;
-        } catch (IOException ex) {
-            return false;
-        }
-    }
-
     public int getOutstandingHandshakeNum() {
         if (serverCnxnFactory instanceof NettyServerCnxnFactory) {
             return ((NettyServerCnxnFactory) serverCnxnFactory).getOutstandingHandshakeNum();
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index b6eeb758a..1818bf9bb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -804,9 +804,7 @@ public class Learner {
                     continue;
                 }
                 packetsCommitted.remove();
-                Request request = new Request(null, p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), null, null);
-                request.setTxn(p.rec);
-                request.setHdr(p.hdr);
+                Request request = new Request(p.hdr.getClientId(), p.hdr.getCxid(), p.hdr.getType(), p.hdr, p.rec, -1);
                 request.setTxnDigest(p.digest);
                 ozk.commitRequest(request);
             }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
index 259d9fff9..eea11d33a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java
@@ -43,6 +43,7 @@ import org.apache.jute.BinaryOutputArchive;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.TxnLogProposalIterator;
 import org.apache.zookeeper.server.ZKDatabase;
@@ -702,9 +703,9 @@ public class LearnerHandler extends ZooKeeperThread {
                     bb = bb.slice();
                     Request si;
                     if (type == OpCode.sync) {
-                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
+                        si = new LearnerSyncRequest(this, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
                     } else {
-                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
+                        si = new Request(null, sessionId, cxid, type, RequestRecord.fromBytes(bb), qp.getAuthinfo());
                     }
                     si.setOwner(this);
                     learnerMaster.submitLearnerRequest(si);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
index d4c83aeab..6892d3dd8 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSyncRequest.java
@@ -18,17 +18,17 @@
 
 package org.apache.zookeeper.server.quorum;
 
-import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
 
 public class LearnerSyncRequest extends Request {
 
     LearnerHandler fh;
     public LearnerSyncRequest(
-        LearnerHandler fh, long sessionId, int xid, int type, ByteBuffer bb, List<Id> authInfo) {
-        super(null, sessionId, xid, type, bb, authInfo);
+        LearnerHandler fh, long sessionId, int xid, int type, RequestRecord request, List<Id> authInfo) {
+        super(null, sessionId, xid, type, request, authInfo);
         this.fh = fh;
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
index 2f24347b7..240936956 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
@@ -20,7 +20,6 @@ package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.nio.ByteBuffer;
 import java.util.Objects;
 import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
@@ -32,10 +31,12 @@ import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.CreateSessionTxn;
 
 /**
  * Abstract base class for all ZooKeeperServers that participate in
@@ -76,8 +77,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
         }
 
         if (OpCode.multi == request.type) {
-            MultiOperationRecord multiTransactionRecord = new MultiOperationRecord();
-            request.readRequestRecord(multiTransactionRecord);
+            MultiOperationRecord multiTransactionRecord = request.readRequestRecord(MultiOperationRecord::new);
             boolean containsEphemeralCreate = false;
             for (Op op : multiTransactionRecord) {
                 if (op.getType() == OpCode.create || op.getType() == OpCode.create2) {
@@ -93,8 +93,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
                 return null;
             }
         } else {
-            CreateRequest createRequest = new CreateRequest();
-            request.readRequestRecord(createRequest);
+            CreateRequest createRequest = request.readRequestRecord(CreateRequest::new);
             CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
             if (!createMode.isEphemeral()) {
                 return null;
@@ -116,9 +115,8 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
         synchronized (upgradeableSessionTracker) {
             if (upgradeableSessionTracker.isLocalSession(sessionId)) {
                 int timeout = upgradeableSessionTracker.upgradeSession(sessionId);
-                ByteBuffer to = ByteBuffer.allocate(4);
-                to.putInt(timeout);
-                return new Request(null, sessionId, 0, OpCode.createSession, to, null);
+                CreateSessionTxn txn = new CreateSessionTxn(timeout);
+                return new Request(null, sessionId, 0, OpCode.createSession, RequestRecord.fromRecord(txn), null);
             }
         }
         return null;
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java
index df3b831a3..8a700bbdf 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/audit/Slf4JAuditLoggerTest.java
@@ -39,9 +39,9 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.audit.AuditEvent.Result;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.server.util.AuthUtil;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
 import org.apache.zookeeper.test.LoggerTestTool;
@@ -290,9 +290,7 @@ public class Slf4JAuditLoggerTest extends QuorumPeerTestBase {
 
     private String getUser() {
         ServerCnxn next = getServerCnxn();
-        Request request = new Request(next, -1, -1, -1, null,
-                next.getAuthInfo());
-        return request.getUsersForAudit();
+        return AuthUtil.getUsers(next.getAuthInfo());
     }
 
     private String getIp() {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
index f9fd6d8b5..0e6c1b58f 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
@@ -19,10 +19,12 @@
 package org.apache.zookeeper.server;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -36,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.DeleteContainerRequest;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Op;
 import org.apache.zookeeper.ZooDefs;
@@ -91,7 +94,7 @@ public class CreateContainerTest extends ClientBase {
         Stat stat = createWithStatVerifyResult("/foo");
         Stat childStat = createWithStatVerifyResult("/foo/child");
         // Don't expect to get the same stats for different creates.
-        assertFalse(stat.equals(childStat));
+        assertNotEquals(stat, childStat);
     }
 
     @SuppressWarnings("ConstantConditions")
@@ -224,7 +227,11 @@ public class CreateContainerTest extends ClientBase {
         RequestProcessor processor = new RequestProcessor() {
             @Override
             public void processRequest(Request request) {
-                queue.add(new String(request.readRequestBytes()));
+                try {
+                    queue.add(request.readRequestRecord(DeleteContainerRequest::new).getPath());
+                } catch (IOException e) {
+                    fail(e);
+                }
             }
 
             @Override
@@ -246,14 +253,13 @@ public class CreateContainerTest extends ClientBase {
             containerManager.checkContainers();
             return null;
         });
-        assertEquals(queue.poll(5, TimeUnit.SECONDS), "/one");
-        assertEquals(queue.poll(5, TimeUnit.SECONDS), "/two");
-        assertEquals(queue.size(), 0);
+        assertEquals("/one", queue.poll(5, TimeUnit.SECONDS));
+        assertEquals("/two", queue.poll(5, TimeUnit.SECONDS));
+        assertEquals(0, queue.size());
         Thread.sleep(500);
-        assertEquals(queue.size(), 0);
-
-        assertEquals(queue.poll(5, TimeUnit.SECONDS), "/three");
-        assertEquals(queue.poll(5, TimeUnit.SECONDS), "/four");
+        assertEquals(0, queue.size());
+        assertEquals("/three", queue.poll(5, TimeUnit.SECONDS));
+        assertEquals("/four", queue.poll(5, TimeUnit.SECONDS));
     }
 
     @Test
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java
index 0181d2ebe..4b47d6cbb 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/FinalRequestProcessorTest.java
@@ -96,7 +96,7 @@ public class FinalRequestProcessorTest {
         // Arrange
 
         // Act
-        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, new ArrayList<Id>());
+        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), new ArrayList<Id>());
         processor.processRequest(r);
 
         // Assert
@@ -109,7 +109,7 @@ public class FinalRequestProcessorTest {
         testACLs.remove(2);
 
         // Act
-        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, new ArrayList<Id>());
+        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), new ArrayList<Id>());
         processor.processRequest(r);
 
         // Assert
@@ -123,7 +123,7 @@ public class FinalRequestProcessorTest {
         authInfo.add(new Id("digest", "otheruser:somesecrethash"));
 
         // Act
-        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo);
+        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo);
         processor.processRequest(r);
 
         // Assert
@@ -137,7 +137,7 @@ public class FinalRequestProcessorTest {
         authInfo.add(new Id("digest", "user:secrethash"));
 
         // Act
-        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo);
+        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo);
         processor.processRequest(r);
 
         // Assert
@@ -151,7 +151,7 @@ public class FinalRequestProcessorTest {
         authInfo.add(new Id("digest", "adminuser:adminsecret"));
 
         // Act
-        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo);
+        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo);
         processor.processRequest(r);
 
         // Assert
@@ -167,7 +167,7 @@ public class FinalRequestProcessorTest {
         authInfo.add(new Id("digest", "adminuser:adminsecret"));
 
         // Act
-        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, bb, authInfo);
+        Request r = new Request(cnxn, 0, 0, ZooDefs.OpCode.getACL, RequestRecord.fromBytes(bb), authInfo);
         processor.processRequest(r);
 
         // Assert
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
index 001e23635..682691010 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
@@ -117,7 +117,7 @@ public class MultiOpSessionUpgradeTest extends QuorumBase {
         GetDataRequest getDataRequest = new GetDataRequest(path, false);
         getDataRequest.serialize(boa, "request");
         ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
-        return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList<Id>());
+        return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, RequestRecord.fromBytes(bb), new ArrayList<Id>());
     }
 
     private Request makeCreateRequest(String path, long sessionId) throws IOException {
@@ -126,7 +126,7 @@ public class MultiOpSessionUpgradeTest extends QuorumBase {
         CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
         createRequest.serialize(boa, "request");
         ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
-        return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<Id>());
+        return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, RequestRecord.fromBytes(bb), new ArrayList<Id>());
     }
 
     private QuorumZooKeeperServer getConnectedServer(long sessionId) {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
index 062cae1f9..bfb0db5f3 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java
@@ -29,15 +29,12 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
@@ -94,12 +91,7 @@ public class PrepRequestProcessorMetricsTest extends ZKTestCase {
     }
 
     private Request createRequest(Record record, int opCode) throws IOException {
-        // encoding
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-        record.serialize(boa, "request");
-        baos.close();
-        return new Request(null, 1L, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), null);
+        return new Request(null, 1L, 0, opCode, RequestRecord.fromRecord(record), null);
     }
 
     private Request createRequest(String path, int opCode) throws IOException {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
index 9e7120569..d80c5e08e 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java
@@ -24,11 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.io.IOException;
 import java.io.PrintWriter;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,7 +34,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -66,13 +62,10 @@ import org.apache.zookeeper.txn.ErrorTxn;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 public class PrepRequestProcessorTest extends ClientBase {
 
-    private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorTest.class);
     private static final int CONNECTION_TIMEOUT = 3000;
     private static String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
     private CountDownLatch pLatch;
@@ -120,34 +113,28 @@ public class PrepRequestProcessorTest extends ClientBase {
     public void testPRequest() throws Exception {
         pLatch = new CountDownLatch(1);
         processor = new PrepRequestProcessor(zks, new MyRequestProcessor());
-        Request foo = new Request(null, 1L, 1, OpCode.create, ByteBuffer.allocate(3), null);
+        Request foo = new Request(null, 1L, 1, OpCode.create, RequestRecord.fromBytes(new byte[3]), null);
         processor.pRequest(foo);
 
         assertEquals(new ErrorTxn(KeeperException.Code.MARSHALLINGERROR.intValue()), outcome.getTxn(), "Request should have marshalling error");
         assertTrue(pLatch.await(5, TimeUnit.SECONDS), "request hasn't been processed in chain");
     }
 
-    private Request createRequest(Record record, int opCode) throws IOException {
+    private Request createRequest(Record record, int opCode) {
         return createRequest(record, opCode, 1L);
     }
 
-    private Request createRequest(Record record, int opCode, long sessionId) throws IOException {
+    private Request createRequest(Record record, int opCode, long sessionId) {
         return createRequest(record, opCode, sessionId, false);
     }
 
-    private Request createRequest(Record record, int opCode, boolean admin) throws IOException {
+    private Request createRequest(Record record, int opCode, boolean admin) {
         return createRequest(record, opCode, 1L, admin);
     }
 
-    private Request createRequest(Record record, int opCode, long sessionId, boolean admin) throws IOException {
-        // encoding
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-        record.serialize(boa, "request");
-        baos.close();
-        // Id
-        List<Id> ids = Arrays.asList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE);
-        return new Request(null, sessionId, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), ids);
+    private Request createRequest(Record record, int opCode, long sessionId, boolean admin) {
+        List<Id> ids = Collections.singletonList(admin ? new Id("super", "super user") : Ids.ANYONE_ID_UNSAFE);
+        return new Request(null, sessionId, 0, opCode, RequestRecord.fromRecord(record), ids);
     }
 
     private void process(List<Op> ops) throws Exception {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java
index 16c7a7dd1..681e8348b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperCriticalThreadMetricsTest.java
@@ -19,7 +19,6 @@
 package org.apache.zookeeper.server;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import org.apache.zookeeper.ZKTestCase;
@@ -67,7 +66,7 @@ public class ZooKeeperCriticalThreadMetricsTest extends ZKTestCase {
         PrepRequestProcessor processor = new MyPrepRequestProcessor();
         processor.start();
 
-        processor.processRequest(new Request(null, 1L, 1, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null));
+        processor.processRequest(new Request(null, 1L, 1, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null));
         processed.await();
 
         processor.shutdown();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
index 1050a474b..e5f668d1f 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorConcurrencyTest.java
@@ -43,6 +43,7 @@ import org.apache.zookeeper.proto.GetDataRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.WorkerService;
 import org.apache.zookeeper.server.ZooKeeperServerListener;
 import org.junit.jupiter.api.AfterEach;
@@ -129,7 +130,7 @@ public class CommitProcessorConcurrencyTest extends ZKTestCase {
         BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
         rec.serialize(boa, "request");
         ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
-        return new Request(null, sessionId, xid, type, bb, new ArrayList<Id>());
+        return new Request(null, sessionId, xid, type, RequestRecord.fromBytes(bb), new ArrayList<Id>());
     }
 
     /**
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
index 4b67f5b50..4a4598355 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorMetricsTest.java
@@ -22,7 +22,6 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -31,6 +30,7 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.WorkerService;
 import org.junit.jupiter.api.AfterEach;
@@ -192,11 +192,11 @@ public class CommitProcessorMetricsTest extends ZKTestCase {
     }
 
     private Request createReadRequest(long sessionId, int xid) {
-        return new Request(null, sessionId, xid, ZooDefs.OpCode.getData, ByteBuffer.wrap(new byte[10]), null);
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.getData, RequestRecord.fromBytes(new byte[10]), null);
     }
 
     private Request createWriteRequest(long sessionId, int xid) {
-        return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null);
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null);
     }
 
     private void processRequestWithWait(Request request) throws Exception {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
index 46958d194..46a4c3874 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CommitProcessorTest.java
@@ -42,6 +42,7 @@ import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.PrepRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.jupiter.api.AfterEach;
@@ -160,7 +161,7 @@ public class CommitProcessorTest extends ZKTestCase {
                                                         + (++nodeId), new byte[0], Ids.OPEN_ACL_UNSAFE, 1);
             createReq.serialize(boa, "request");
             ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
-            Request req = new Request(null, sessionId, ++cxid, OpCode.create, bb, new ArrayList<Id>());
+            Request req = new Request(null, sessionId, ++cxid, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList<Id>());
             zks.getFirstProcessor().processRequest(req);
 
         }
@@ -174,7 +175,7 @@ public class CommitProcessorTest extends ZKTestCase {
                                                                + nodeId, false);
             getDataRequest.serialize(boa, "request");
             ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
-            Request req = new Request(null, sessionId, ++cxid, OpCode.getData, bb, new ArrayList<Id>());
+            Request req = new Request(null, sessionId, ++cxid, OpCode.getData, RequestRecord.fromBytes(bb), new ArrayList<Id>());
             zks.getFirstProcessor().processRequest(req);
         }
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java
index ef1f12194..5216eb703 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/RaceConditionTest.java
@@ -23,7 +23,6 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.IOException;
 import java.net.SocketException;
-import java.nio.ByteBuffer;
 import javax.security.sasl.SaslException;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZooDefs;
@@ -32,11 +31,13 @@ import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.PrepRequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.txn.DeleteTxn;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -220,7 +221,8 @@ public class RaceConditionTest extends QuorumPeerTestBase {
              * Add a request so that something is there for SyncRequestProcessor
              * to process, while we are in shutdown flow
              */
-            Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, ByteBuffer.wrap("/deadLockIssue".getBytes()), null);
+            DeleteTxn deleteTxn = new DeleteTxn("/deadLockIssue");
+            Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete, RequestRecord.fromRecord(deleteTxn), null);
             processRequest(request);
             super.shutdown();
         }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
index 8803a73cb..3d60c8b91 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
@@ -40,6 +40,7 @@ import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.jupiter.api.AfterEach;
@@ -317,8 +318,7 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
                             }
 
                             if (request.type == ZooDefs.OpCode.create && request.cnxn != null) {
-                                CreateRequest createRequest = new CreateRequest();
-                                request.readRequestRecord(createRequest);
+                                CreateRequest createRequest = request.readRequestRecord(CreateRequest::new);
                                 try {
                                     CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
                                     if (createMode.isEphemeral()) {
@@ -355,7 +355,7 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
         CreateRequest createRequest = new CreateRequest(path, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
         createRequest.serialize(boa, "request");
         ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
-        return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<Id>());
+        return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, RequestRecord.fromBytes(bb), new ArrayList<Id>());
     }
 
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
index 72bceafa3..17ccdae6e 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java
@@ -26,7 +26,6 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -34,6 +33,7 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -65,7 +65,7 @@ public class SyncRequestProcessorMetricTest {
     }
 
     private Request createRquest(long sessionId, int xid) {
-        return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, ByteBuffer.wrap(new byte[10]), null);
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.setData, RequestRecord.fromBytes(new byte[10]), null);
     }
 
     @Test
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
index 99cd171c0..a619866d9 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
@@ -33,6 +33,7 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestRecord;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
@@ -100,7 +101,7 @@ public class LeaderSessionTrackerTest extends ZKTestCase {
 
         LOG.info("Fake session Id: {}", Long.toHexString(fakeSessionId));
 
-        Request request = new Request(null, fakeSessionId, 0, OpCode.create, bb, new ArrayList<Id>());
+        Request request = new Request(null, fakeSessionId, 0, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList<Id>());
 
         // Submit request directly to leader
         leader.getActiveServer().submitRequest(request);
@@ -138,7 +139,7 @@ public class LeaderSessionTrackerTest extends ZKTestCase {
 
         LOG.info("Local session Id: {}", Long.toHexString(locallSession));
 
-        Request request = new Request(null, locallSession, 0, OpCode.create, bb, new ArrayList<Id>());
+        Request request = new Request(null, locallSession, 0, OpCode.create, RequestRecord.fromBytes(bb), new ArrayList<Id>());
 
         // Submit request directly to leader
         leader.getActiveServer().submitRequest(request);