You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2018/12/12 00:33:40 UTC

[geode] branch develop updated: GEODE-6166: Improves put performance (#2970)

This is an automated email from the ASF dual-hosted git repository.

jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2d6a617  GEODE-6166: Improves put performance (#2970)
2d6a617 is described below

commit 2d6a6177daab59d3cf6ae8c3e78a884a4e1941c2
Author: Jacob Barrett <jb...@pivotal.io>
AuthorDate: Tue Dec 11 16:33:31 2018 -0800

    GEODE-6166: Improves put performance (#2970)
    
    * Encode Operation ordinal in Message Part directly
    * Don't allocate hashmap until we need it.
    * Add putByte to Part and pre-allocate serialized form.
    * Use Deque to avoid allocating lots of LinkedList.Node objects
---
 .../geode/cache/client/internal/DestroyOp.java     |  8 +--
 .../cache/client/internal/OpExecutorImpl.java      |  8 ++-
 .../apache/geode/cache/client/internal/PutOp.java  |  4 +-
 .../internal/pooling/ConnectionManagerImpl.java    | 21 +++----
 .../geode/internal/InternalDataSerializer.java     | 13 ++--
 .../internal/cache/tier/sockets/BaseCommand.java   | 23 +++++++
 .../geode/internal/cache/tier/sockets/Message.java |  7 +++
 .../geode/internal/cache/tier/sockets/Part.java    | 25 ++++++++
 .../cache/tier/sockets/command/Destroy65.java      | 66 +++++++++----------
 .../internal/cache/tier/sockets/command/Put65.java | 73 +++++++++++-----------
 10 files changed, 149 insertions(+), 99 deletions(-)

diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/DestroyOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/DestroyOp.java
index 1eac281..118888a 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/DestroyOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/DestroyOp.java
@@ -139,9 +139,7 @@ public class DestroyOp {
       getMessage().addStringPart(region.getFullPath());
       getMessage().addStringOrObjPart(key);
       getMessage().addObjPart(expectedOldValue);
-      getMessage().addObjPart(operation == Operation.DESTROY ? null : operation); // server
-                                                                                  // interprets null
-                                                                                  // as DESTROY
+      getMessage().addBytePart(operation.ordinal);
       getMessage().addBytesPart(event.getEventId().calcBytes());
       if (callbackArg != null) {
         getMessage().addObjPart(callbackArg);
@@ -158,9 +156,7 @@ public class DestroyOp {
       getMessage().addStringPart(region);
       getMessage().addStringOrObjPart(key);
       getMessage().addObjPart(expectedOldValue);
-      getMessage().addObjPart(operation == Operation.DESTROY ? null : operation); // server
-                                                                                  // interprets null
-                                                                                  // as DESTROY
+      getMessage().addBytePart(operation.ordinal);
       getMessage().addBytesPart(event.getEventId().calcBytes());
       if (callbackArg != null) {
         getMessage().addObjPart(callbackArg);
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
index 546c9ed..18bc600 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/OpExecutorImpl.java
@@ -142,8 +142,6 @@ public class OpExecutorImpl implements ExecutablePool {
     }
     boolean success = false;
 
-    Set attemptedServers = new HashSet();
-
     Connection conn = (Connection) (threadLocalConnections ? localConnection.get() : null);
     if (conn == null || conn.isDestroyed()) {
       conn = connectionManager.borrowConnection(serverTimeout);
@@ -159,6 +157,8 @@ public class OpExecutorImpl implements ExecutablePool {
       }
     }
     try {
+      Set attemptedServers = null;
+
       for (int attempt = 0; true; attempt++) {
         // when an op is retried we may need to try to recover the previous
         // attempt's version stamp
@@ -178,6 +178,10 @@ public class OpExecutorImpl implements ExecutablePool {
           // It also unsets the threadlocal connection and notifies
           // the connection manager if there are failures.
           handleException(e, conn, attempt, attempt >= retries && retries != -1);
+          if (null == attemptedServers) {
+            // don't allocate this until we need it
+            attemptedServers = new HashSet();
+          }
           attemptedServers.add(conn.getServer());
           try {
             conn = connectionManager.exchangeConnection(conn, attemptedServers, serverTimeout);
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
index 580aba9..231780e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PutOp.java
@@ -182,7 +182,7 @@ public class PutOp {
       this.requireOldValue = requireOldValue;
       this.expectedOldValue = expectedOldValue;
       getMessage().addStringPart(regionName);
-      getMessage().addObjPart(op);
+      getMessage().addBytePart(op.ordinal);
       int flags = 0;
       if (requireOldValue)
         flags |= 0x01;
@@ -246,7 +246,7 @@ public class PutOp {
         logger.debug("PutOpImpl constructing message with operation={}", op);
       }
       getMessage().addStringPart(region.getFullPath());
-      getMessage().addObjPart(op);
+      getMessage().addBytePart(op.ordinal);
       int flags = 0;
       if (requireOldValue)
         flags |= 0x01;
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
index 39f482a..465ae75 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/pooling/ConnectionManagerImpl.java
@@ -15,6 +15,7 @@
 package org.apache.geode.cache.client.internal.pooling;
 
 import java.net.SocketException;
+import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -73,8 +74,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
   private final String poolName;
   private final PoolStats poolStats;
   protected final long prefillRetry; // ms
-  private final LinkedList/* <PooledConnection> */ availableConnections =
-      new LinkedList/* <PooledConnection> */();
+  private final ArrayDeque<PooledConnection> availableConnections = new ArrayDeque<>();
   protected final ConnectionMap allConnectionsMap = new ConnectionMap();
   private final EndpointManager endpointManager;
   private final int maxConnections;
@@ -213,7 +213,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
       }
 
       while (!availableConnections.isEmpty()) {
-        PooledConnection connection = (PooledConnection) availableConnections.removeFirst();
+        PooledConnection connection = availableConnections.removeFirst();
         try {
           connection.activate();
           return connection;
@@ -280,8 +280,8 @@ public class ConnectionManagerImpl implements ConnectionManager {
       if (shuttingDown) {
         throw new PoolCancelledException();
       }
-      for (Iterator itr = availableConnections.iterator(); itr.hasNext();) {
-        PooledConnection nextConnection = (PooledConnection) itr.next();
+      for (Iterator<PooledConnection> itr = availableConnections.iterator(); itr.hasNext();) {
+        PooledConnection nextConnection = itr.next();
         try {
           nextConnection.activate();
           if (nextConnection.getServer().equals(server)) {
@@ -354,8 +354,8 @@ public class ConnectionManagerImpl implements ConnectionManager {
       if (shuttingDown) {
         throw new PoolCancelledException();
       }
-      for (Iterator itr = availableConnections.iterator(); itr.hasNext();) {
-        PooledConnection nextConnection = (PooledConnection) itr.next();
+      for (Iterator<PooledConnection> itr = availableConnections.iterator(); itr.hasNext();) {
+        PooledConnection nextConnection = itr.next();
         if (!excludedServers.contains(nextConnection.getServer())) {
           itr.remove();
           try {
@@ -492,12 +492,7 @@ public class ConnectionManagerImpl implements ConnectionManager {
         }
       }
 
-      for (Iterator itr = availableConnections.iterator(); itr.hasNext();) {
-        PooledConnection conn = (PooledConnection) itr.next();
-        if (badConnections.contains(conn)) {
-          itr.remove();
-        }
-      }
+      availableConnections.removeIf(badConnections::contains);
 
       connectionCount -= badConnections.size();
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index 4ac7b9a..e17798f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -2180,7 +2180,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
     // Handle special objects first
     if (o == null) {
       out.writeByte(DSCODE.NULL.toByte());
-
     } else if (o instanceof DataSerializableFixedID) {
       checkPdxCompatible(o, ensurePdxCompatibility);
       DataSerializableFixedID dsfid = (DataSerializableFixedID) o;
@@ -3008,6 +3007,13 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
   public static boolean writePdx(DataOutput out, InternalCache internalCache, Object pdx,
       PdxSerializer pdxSerializer) throws IOException {
+
+    // Hack to make sure we don't pass internal objects to the user's serializer
+    if (pdxSerializer != null &&
+        isGemfireObject(pdx)) {
+      return false;
+    }
+
     TypeRegistry tr = null;
     if (internalCache != null) {
       tr = internalCache.getPdxRegistry();
@@ -3023,11 +3029,6 @@ public abstract class InternalDataSerializer extends DataSerializer {
 
     try {
       if (pdxSerializer != null) {
-        // Hack to make sure we don't pass internal objects to the user's
-        // serializer
-        if (isGemfireObject(pdx)) {
-          return false;
-        }
         if (is662SerializationEnabled()) {
           boolean alreadyInProgress = isPdxSerializationInProgress();
           if (!alreadyInProgress) {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
index 9d5e490..9df5d92 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/BaseCommand.java
@@ -1443,4 +1443,27 @@ public abstract class BaseCommand implements Command {
       appendInterestResponseKey(region, riKey, entryKey, collector, servConn);
     }
   }
+
+  protected static Operation getOperation(final Part operationPart,
+      final Operation defaultOperation) throws Exception {
+
+    if (operationPart.isBytes()) {
+      final byte[] bytes = operationPart.getSerializedForm();
+      if (null == bytes || 0 == bytes.length) {
+        // older clients can send empty bytes for default operation.
+        return defaultOperation;
+      } else {
+        return Operation.fromOrdinal(bytes[0]);
+      }
+    }
+
+    // Fallback for older clients.
+    final Operation operation = (Operation) operationPart.getObject();
+    if (operation == null) {
+      // native clients may send a null since the op is java-serialized.
+      return defaultOperation;
+    }
+    return operation;
+  }
+
 }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index 4876de6..e819930 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -418,6 +418,13 @@ public class Message {
     this.currentPart++;
   }
 
+  public void addBytePart(byte v) {
+    this.messageModified = true;
+    Part part = this.partsList[this.currentPart];
+    part.setByte(v);
+    this.currentPart++;
+  }
+
   /**
    * Adds a new part to this message that may contain a serialized object.
    */
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
index 9606e6f..4a853d7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Part.java
@@ -172,6 +172,31 @@ public class Part {
     return CacheServerHelper.fromUTF((byte[]) this.part);
   }
 
+  private static final byte[][] BYTES = new byte[256][1];
+  private static final int BYTES_OFFSET = -1 * Byte.MIN_VALUE;
+  static {
+    for (byte i = Byte.MIN_VALUE; i < Byte.MAX_VALUE; i++) {
+      BYTES[i + BYTES_OFFSET][0] = i;
+    }
+  }
+
+  public void setByte(byte b) {
+    this.typeCode = BYTE_CODE;
+    this.part = BYTES[b + BYTES_OFFSET];
+  }
+
+  public byte getByte() {
+    if (!isBytes()) {
+      Assert.assertTrue(false, "expected int part to be of type BYTE, part = " + this.toString());
+    }
+    if (getLength() != 1) {
+      Assert.assertTrue(false,
+          "expected int length to be 1 but it was " + getLength() + "; part = " + this.toString());
+    }
+    final byte[] bytes = getSerializedForm();
+    return bytes[0];
+  }
+
   public int getInt() {
     if (!isBytes()) {
       Assert.assertTrue(false, "expected int part to be of type BYTE, part = " + this.toString());
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java
index 708d8b9..f175b36 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Destroy65.java
@@ -27,10 +27,8 @@ import org.apache.geode.distributed.internal.DistributionStats;
 import org.apache.geode.internal.cache.EventID;
 import org.apache.geode.internal.cache.EventIDHolder;
 import org.apache.geode.internal.cache.LocalRegion;
-import org.apache.geode.internal.cache.OpType;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.Token;
-import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -97,19 +95,7 @@ public class Destroy65 extends BaseCommand {
   @Override
   public void cmdExecute(final Message clientMessage, final ServerConnection serverConnection,
       final SecurityService securityService, long start) throws IOException, InterruptedException {
-    Part regionNamePart;
-    Part keyPart;
-    Part callbackArgPart;
-    Part eventPart;
-    Part expectedOldValuePart;
-
-    Object operation = null;
-    Object expectedOldValue = null;
-
-    String regionName = null;
-    Object callbackArg = null, key = null;
     StringBuilder errMessage = new StringBuilder();
-    CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
     CacheServerStats stats = serverConnection.getCacheServerStats();
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
 
@@ -117,29 +103,35 @@ public class Destroy65 extends BaseCommand {
     stats.incReadDestroyRequestTime(now - start);
 
     // Retrieve the data from the message parts
-    regionNamePart = clientMessage.getPart(0);
-    keyPart = clientMessage.getPart(1);
-    expectedOldValuePart = clientMessage.getPart(2);
-    try {
-
-      operation = clientMessage.getPart(3).getObject();
-
-      if (((operation instanceof Operation) && ((Operation) operation == Operation.REMOVE))
-          || ((operation instanceof Byte) && (Byte) operation == OpType.DESTROY))
+    final Part regionNamePart = clientMessage.getPart(0);
+    final Part keyPart = clientMessage.getPart(1);
+    final Part expectedOldValuePart = clientMessage.getPart(2);
 
-      {
-        expectedOldValue = expectedOldValuePart.getObject();
-      }
+    final Operation operation;
+    try {
+      operation = getOperation(clientMessage.getPart(3), Operation.DESTROY);
     } catch (Exception e) {
       writeException(clientMessage, e, false, serverConnection);
       serverConnection.setAsTrue(RESPONDED);
       return;
     }
 
-    eventPart = clientMessage.getPart(4);
+    Object expectedOldValue = null;
+    if (operation == Operation.REMOVE) {
+      try {
+        expectedOldValue = expectedOldValuePart.getObject();
+      } catch (Exception e) {
+        writeException(clientMessage, e, false, serverConnection);
+        serverConnection.setAsTrue(RESPONDED);
+        return;
+      }
+    }
+
+    final Part eventPart = clientMessage.getPart(4);
 
+    Object callbackArg = null;
     if (clientMessage.getNumberOfParts() > 5) {
-      callbackArgPart = clientMessage.getPart(5);
+      final Part callbackArgPart = clientMessage.getPart(5);
       try {
         callbackArg = callbackArgPart.getObject();
       } catch (Exception e) {
@@ -148,7 +140,8 @@ public class Destroy65 extends BaseCommand {
         return;
       }
     }
-    regionName = regionNamePart.getString();
+
+    final Object key;
     try {
       key = keyPart.getStringOrObject();
     } catch (Exception e) {
@@ -156,6 +149,9 @@ public class Destroy65 extends BaseCommand {
       serverConnection.setAsTrue(RESPONDED);
       return;
     }
+
+    final String regionName = regionNamePart.getString();
+
     if (logger.isDebugEnabled()) {
       logger.debug(
           "{}: Received destroy65 request ({} bytes; op={}) from {} for region {} key {}{} txId {}",
@@ -185,7 +181,7 @@ public class Destroy65 extends BaseCommand {
       return;
     }
 
-    LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
+    final LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
     if (region == null) {
       String reason = String.format("%s was not found during destroy request",
           regionName);
@@ -195,12 +191,12 @@ public class Destroy65 extends BaseCommand {
     }
 
     // Destroy the entry
-    ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
-    long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
-    long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
-    EventID eventId =
+    final ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
+    final long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
+    final long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
+    final EventID eventId =
         new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId);
-    EventIDHolder clientEvent = new EventIDHolder(eventId);
+    final EventIDHolder clientEvent = new EventIDHolder(eventId);
 
     Breadcrumbs.setEventId(eventId);
 
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
index d0597c9..a2bdd07 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/command/Put65.java
@@ -36,7 +36,6 @@ import org.apache.geode.internal.cache.LocalRegion;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import org.apache.geode.internal.cache.TXManagerImpl;
 import org.apache.geode.internal.cache.Token;
-import org.apache.geode.internal.cache.tier.CachedRegionHelper;
 import org.apache.geode.internal.cache.tier.Command;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.BaseCommand;
@@ -68,14 +67,8 @@ public class Put65 extends BaseCommand {
       final SecurityService securityService, long p_start)
       throws IOException, InterruptedException {
     long start = p_start;
-    Part regionNamePart = null, keyPart = null, valuePart = null, callbackArgPart = null;
-    String regionName = null;
-    Object callbackArg = null, key = null;
-    Part eventPart = null;
-    StringBuilder errMessage = new StringBuilder();
-    boolean isDelta = false;
-    CachedRegionHelper crHelper = serverConnection.getCachedRegionHelper();
-    CacheServerStats stats = serverConnection.getCacheServerStats();
+    final StringBuilder errMessage = new StringBuilder();
+    final CacheServerStats stats = serverConnection.getCacheServerStats();
 
     // requiresResponse = true;
     serverConnection.setAsTrue(REQUIRES_RESPONSE);
@@ -86,22 +79,22 @@ public class Put65 extends BaseCommand {
     }
     // Retrieve the data from the message parts
     int idx = 0;
-    regionNamePart = clientMessage.getPart(idx++);
-    Operation operation;
+
+    final Part regionNamePart = clientMessage.getPart(idx++);
+
+    final Operation operation;
     try {
-      operation = (Operation) clientMessage.getPart(idx++).getObject();
-      if (operation == null) { // native clients send a null since the op is java-serialized
-        operation = Operation.UPDATE;
-      }
-    } catch (ClassNotFoundException e) {
+      operation = getOperation(clientMessage.getPart(idx++), Operation.UPDATE);
+    } catch (Exception e) {
       writeException(clientMessage, e, false, serverConnection);
       serverConnection.setAsTrue(RESPONDED);
       return;
     }
-    int flags = clientMessage.getPart(idx++).getInt();
-    boolean requireOldValue = ((flags & 0x01) == 0x01);
-    boolean haveExpectedOldValue = ((flags & 0x02) == 0x02);
-    Object expectedOldValue = null;
+
+    final int flags = clientMessage.getPart(idx++).getInt();
+    final boolean requireOldValue = ((flags & 0x01) == 0x01);
+    final boolean haveExpectedOldValue = ((flags & 0x02) == 0x02);
+    final Object expectedOldValue;
     if (haveExpectedOldValue) {
       try {
         expectedOldValue = clientMessage.getPart(idx++).getObject();
@@ -110,10 +103,15 @@ public class Put65 extends BaseCommand {
         serverConnection.setAsTrue(RESPONDED);
         return;
       }
+    } else {
+      expectedOldValue = null;
     }
-    keyPart = clientMessage.getPart(idx++);
+
+    final Part keyPart = clientMessage.getPart(idx++);
+
+    final boolean isDelta;
     try {
-      isDelta = ((Boolean) clientMessage.getPart(idx).getObject()).booleanValue();
+      isDelta = ((Boolean) clientMessage.getPart(idx).getObject());
       idx += 1;
     } catch (Exception e) {
       writeException(clientMessage, MessageType.PUT_DELTA_ERROR, e, false, serverConnection);
@@ -121,10 +119,13 @@ public class Put65 extends BaseCommand {
       // CachePerfStats not available here.
       return;
     }
-    valuePart = clientMessage.getPart(idx++);
-    eventPart = clientMessage.getPart(idx++);
+
+    final Part valuePart = clientMessage.getPart(idx++);
+    final Part eventPart = clientMessage.getPart(idx++);
+
+    Object callbackArg = null;
     if (clientMessage.getNumberOfParts() > idx) {
-      callbackArgPart = clientMessage.getPart(idx++);
+      final Part callbackArgPart = clientMessage.getPart(idx++);
       try {
         callbackArg = callbackArgPart.getObject();
       } catch (Exception e) {
@@ -133,8 +134,8 @@ public class Put65 extends BaseCommand {
         return;
       }
     }
-    regionName = regionNamePart.getString();
 
+    final Object key;
     try {
       key = keyPart.getStringOrObject();
     } catch (Exception e) {
@@ -143,6 +144,8 @@ public class Put65 extends BaseCommand {
       return;
     }
 
+    final String regionName = regionNamePart.getString();
+
     final boolean isDebugEnabled = logger.isDebugEnabled();
     if (isDebugEnabled) {
       logger.debug(
@@ -155,14 +158,14 @@ public class Put65 extends BaseCommand {
     // Process the put request
     if (key == null || regionName == null) {
       if (key == null) {
-        String putMsg = " The input key for the put request is null";
+        final String putMsg = " The input key for the put request is null";
         if (isDebugEnabled) {
           logger.debug("{}:{}", serverConnection.getName(), putMsg);
         }
         errMessage.append(putMsg);
       }
       if (regionName == null) {
-        String putMsg = " The input region name for the put request is null";
+        final String putMsg = " The input region name for the put request is null";
         if (isDebugEnabled) {
           logger.debug("{}:{}", serverConnection.getName(), putMsg);
         }
@@ -174,9 +177,9 @@ public class Put65 extends BaseCommand {
       return;
     }
 
-    LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
+    final LocalRegion region = (LocalRegion) serverConnection.getCache().getRegion(regionName);
     if (region == null) {
-      String reason = " was not found during put request";
+      final String reason = " was not found during put request";
       writeRegionDestroyedEx(clientMessage, regionName, reason, serverConnection);
       serverConnection.setAsTrue(RESPONDED);
       return;
@@ -184,7 +187,7 @@ public class Put65 extends BaseCommand {
 
     if (valuePart.isNull() && operation != Operation.PUT_IF_ABSENT && region.containsKey(key)) {
       // Invalid to 'put' a null value in an existing key
-      String putMsg = " Attempted to put a null value for existing key " + key;
+      final String putMsg = " Attempted to put a null value for existing key " + key;
       if (isDebugEnabled) {
         logger.debug("{}:{}", serverConnection.getName(), putMsg);
       }
@@ -195,11 +198,11 @@ public class Put65 extends BaseCommand {
       return;
     }
 
-    ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
-    long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
-    long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
+    final ByteBuffer eventIdPartsBuffer = ByteBuffer.wrap(eventPart.getSerializedForm());
+    final long threadId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
+    final long sequenceId = EventID.readEventIdPartsFromOptmizedByteArray(eventIdPartsBuffer);
 
-    EventIDHolder clientEvent = new EventIDHolder(
+    final EventIDHolder clientEvent = new EventIDHolder(
         new EventID(serverConnection.getEventMemberIDByteArray(), threadId, sequenceId));
 
     Breadcrumbs.setEventId(clientEvent.getEventId());