You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2021/03/15 16:37:53 UTC

[geode] 01/02: GEODE-9021: Remove unnecessary buffer allocation in redis Coder (#6113)

This is an automated email from the ASF dual-hosted git repository.

jensdeppe pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 71c6eb68a6a79cef51433d5ab047a964181a9565
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Thu Mar 11 15:10:15 2021 -0800

    GEODE-9021: Remove unnecessary buffer allocation in redis Coder (#6113)
    
    (cherry picked from commit 00c0e7980bdead3995fc352c3e0f3f52cce4e4fb)
---
 .../redis/internal/executor/RedisResponse.java     |  46 ++--
 .../apache/geode/redis/internal/netty/Coder.java   | 252 ++++++++++-----------
 2 files changed, 138 insertions(+), 160 deletions(-)

diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index 9723987..f6e909e 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -33,11 +33,11 @@ import org.apache.geode.redis.internal.netty.CoderException;
 
 public class RedisResponse {
 
-  private final Function<ByteBufAllocator, ByteBuf> coderCallback;
+  private final Function<ByteBuf, ByteBuf> coderCallback;
 
   private Runnable afterWriteCallback;
 
-  private RedisResponse(Function<ByteBufAllocator, ByteBuf> coderCallback) {
+  private RedisResponse(Function<ByteBuf, ByteBuf> coderCallback) {
     this.coderCallback = coderCallback;
   }
 
@@ -52,37 +52,37 @@ public class RedisResponse {
   }
 
   public ByteBuf encode(ByteBufAllocator allocator) {
-    return coderCallback.apply(allocator);
+    return coderCallback.apply(allocator.buffer());
   }
 
   public static RedisResponse integer(long numericValue) {
-    return new RedisResponse((bba) -> Coder.getIntegerResponse(bba, numericValue));
+    return new RedisResponse((buffer) -> Coder.getIntegerResponse(buffer, numericValue));
   }
 
   public static RedisResponse integer(boolean exists) {
-    return new RedisResponse((bba) -> Coder.getIntegerResponse(bba, exists ? 1 : 0));
+    return new RedisResponse((buffer) -> Coder.getIntegerResponse(buffer, exists ? 1 : 0));
   }
 
   public static RedisResponse string(String stringValue) {
-    return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, stringValue));
+    return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, stringValue));
   }
 
   public static RedisResponse string(byte[] byteArray) {
-    return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, byteArray));
+    return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, byteArray));
   }
 
   public static RedisResponse bulkString(Object value) {
-    return new RedisResponse((bba) -> {
+    return new RedisResponse((buffer) -> {
       try {
-        return Coder.getBulkStringResponse(bba, value);
+        return Coder.getBulkStringResponse(buffer, value);
       } catch (CoderException e) {
-        return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+        return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
       }
     });
   }
 
   public static RedisResponse ok() {
-    return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, "OK"));
+    return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, "OK"));
   }
 
   public static RedisResponse nil() {
@@ -90,21 +90,21 @@ public class RedisResponse {
   }
 
   public static RedisResponse flattenedArray(Collection<Collection<?>> nestedCollection) {
-    return new RedisResponse((bba) -> {
+    return new RedisResponse((buffer) -> {
       try {
-        return Coder.getFlattenedArrayResponse(bba, nestedCollection);
+        return Coder.getFlattenedArrayResponse(buffer, nestedCollection);
       } catch (CoderException e) {
-        return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+        return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
       }
     });
   }
 
   public static RedisResponse array(Collection<?> collection) {
-    return new RedisResponse((bba) -> {
+    return new RedisResponse((buffer) -> {
       try {
-        return Coder.getArrayResponse(bba, collection);
+        return Coder.getArrayResponse(buffer, collection);
       } catch (CoderException e) {
-        return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+        return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
       }
     });
   }
@@ -118,21 +118,19 @@ public class RedisResponse {
   }
 
   public static RedisResponse error(String error) {
-    return new RedisResponse((bba) -> Coder.getErrorResponse(bba, error));
+    return new RedisResponse((buffer) -> Coder.getErrorResponse(buffer, error));
   }
 
   public static RedisResponse customError(String error) {
-    return new RedisResponse((bba) -> Coder.getCustomErrorResponse(bba, error));
+    return new RedisResponse((buffer) -> Coder.getCustomErrorResponse(buffer, error));
   }
 
   public static RedisResponse wrongType(String error) {
-    return new RedisResponse((bba) -> Coder.getWrongTypeResponse(bba, error));
+    return new RedisResponse((buffer) -> Coder.getWrongTypeResponse(buffer, error));
   }
 
   public static RedisResponse scan(BigInteger cursor, List<Object> scanResult) {
-
-    return new RedisResponse(
-        (bba) -> Coder.getScanResponse(bba, cursor, scanResult));
+    return new RedisResponse((buffer) -> Coder.getScanResponse(buffer, cursor, scanResult));
   }
 
   public static RedisResponse emptyScan() {
@@ -147,7 +145,7 @@ public class RedisResponse {
   }
 
   public static RedisResponse bigDecimal(BigDecimal numericValue) {
-    return new RedisResponse((bba) -> Coder.getBigDecimalResponse(bba, numericValue));
+    return new RedisResponse((buffer) -> Coder.getBigDecimalResponse(buffer, numericValue));
   }
 
 }
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 0f62e1f..19f709f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -22,7 +22,6 @@ import java.util.Collection;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
 
 import org.apache.geode.annotations.internal.MakeImmutable;
 import org.apache.geode.redis.internal.data.ByteArrayWrapper;
@@ -32,13 +31,6 @@ import org.apache.geode.redis.internal.data.ByteArrayWrapper;
  */
 public class Coder {
 
-
-  /*
-   * Take no chances on char to byte conversions with default charsets on jvms, so we'll hard code
-   * the UTF-8 symbol values as bytes here
-   */
-
-
   /**
    * byte identifier of a bulk string
    */
@@ -87,6 +79,10 @@ public class Coder {
 
   @MakeImmutable
   public static final byte[] err = stringToBytes("ERR ");
+
+  @MakeImmutable
+  public static final byte[] oom = stringToBytes("OOM ");
+
   @MakeImmutable
   public static final byte[] wrongType = stringToBytes("WRONGTYPE ");
 
@@ -112,94 +108,78 @@ public class Coder {
    */
   public static final String N_INF = "-inf";
 
-  public static ByteBuf getBulkStringResponse(ByteBufAllocator alloc, Object v)
+  public static ByteBuf getBulkStringResponse(ByteBuf buffer, Object v)
       throws CoderException {
-    ByteBuf response;
     byte[] toWrite;
 
     if (v == null) {
-      response = alloc.buffer();
-      response.writeBytes(bNIL);
+      buffer.writeBytes(bNIL);
     } else if (v instanceof byte[]) {
-      byte[] value = (byte[]) v;
-      response = alloc.buffer(value.length + 20);
-      toWrite = value;
-      writeStringResponse(response, toWrite);
+      toWrite = (byte[]) v;
+      writeStringResponse(buffer, toWrite);
     } else if (v instanceof ByteArrayWrapper) {
-      byte[] value = ((ByteArrayWrapper) v).toBytes();
-      response = alloc.buffer(value.length + 20);
-      toWrite = value;
-      writeStringResponse(response, toWrite);
+      toWrite = ((ByteArrayWrapper) v).toBytes();
+      writeStringResponse(buffer, toWrite);
     } else if (v instanceof Double) {
-      response = alloc.buffer();
-      toWrite = doubleToBytes(((Double) v).doubleValue());
-      writeStringResponse(response, toWrite);
+      toWrite = doubleToBytes((Double) v);
+      writeStringResponse(buffer, toWrite);
     } else if (v instanceof String) {
       String value = (String) v;
-      response = alloc.buffer(value.length() + 20);
       toWrite = stringToBytes(value);
-      writeStringResponse(response, toWrite);
+      writeStringResponse(buffer, toWrite);
     } else if (v instanceof Integer) {
-      response = alloc.buffer(15);
-      response.writeByte(INTEGER_ID);
-      response.writeBytes(intToBytes((Integer) v));
-      response.writeBytes(CRLFar);
+      buffer.writeByte(INTEGER_ID);
+      buffer.writeBytes(intToBytes((Integer) v));
+      buffer.writeBytes(CRLFar);
     } else if (v instanceof Long) {
-      response = alloc.buffer(15);
-      response.writeByte(INTEGER_ID);
-      response.writeBytes(intToBytes(((Long) v).intValue()));
-      response.writeBytes(CRLFar);
+      buffer.writeByte(INTEGER_ID);
+      buffer.writeBytes(intToBytes(((Long) v).intValue()));
+      buffer.writeBytes(CRLFar);
     } else {
       throw new CoderException();
     }
 
-    return response;
+    return buffer;
   }
 
-  private static void writeStringResponse(ByteBuf response, byte[] toWrite) {
-    response.writeByte(BULK_STRING_ID);
-    response.writeBytes(intToBytes(toWrite.length));
-    response.writeBytes(CRLFar);
-    response.writeBytes(toWrite);
-    response.writeBytes(CRLFar);
+  private static void writeStringResponse(ByteBuf buffer, byte[] toWrite) {
+    buffer.writeByte(BULK_STRING_ID);
+    buffer.writeBytes(intToBytes(toWrite.length));
+    buffer.writeBytes(CRLFar);
+    buffer.writeBytes(toWrite);
+    buffer.writeBytes(CRLFar);
   }
 
-  public static ByteBuf getFlattenedArrayResponse(ByteBufAllocator alloc,
-      Collection<Collection<?>> items)
+  public static ByteBuf getFlattenedArrayResponse(ByteBuf buffer, Collection<Collection<?>> items)
       throws CoderException {
-    ByteBuf response = alloc.buffer();
-
     for (Object next : items) {
-      writeCollectionOrString(alloc, response, next);
+      writeCollectionOrString(buffer, next);
     }
 
-    return response;
+    return buffer;
   }
 
-  public static ByteBuf getArrayResponse(ByteBufAllocator alloc, Collection<?> items)
+  public static ByteBuf getArrayResponse(ByteBuf buffer, Collection<?> items)
       throws CoderException {
-    ByteBuf response = alloc.buffer();
-    response.writeByte(ARRAY_ID);
-    response.writeBytes(intToBytes(items.size()));
-    response.writeBytes(CRLFar);
+    buffer.writeByte(ARRAY_ID);
+    buffer.writeBytes(intToBytes(items.size()));
+    buffer.writeBytes(CRLFar);
     for (Object next : items) {
-      writeCollectionOrString(alloc, response, next);
+      writeCollectionOrString(buffer, next);
     }
 
-    return response;
+    return buffer;
   }
 
-  private static void writeCollectionOrString(ByteBufAllocator alloc, ByteBuf response, Object next)
+  private static void writeCollectionOrString(ByteBuf buffer, Object next)
       throws CoderException {
     ByteBuf tmp = null;
     try {
       if (next instanceof Collection) {
         Collection<?> nextItems = (Collection<?>) next;
-        tmp = getArrayResponse(alloc, nextItems);
-        response.writeBytes(tmp);
+        getArrayResponse(buffer, nextItems);
       } else {
-        tmp = getBulkStringResponse(alloc, next);
-        response.writeBytes(tmp);
+        getBulkStringResponse(buffer, next);
       }
     } finally {
       if (tmp != null) {
@@ -208,120 +188,120 @@ public class Coder {
     }
   }
 
-  public static ByteBuf getScanResponse(ByteBufAllocator alloc, BigInteger cursor,
+  public static ByteBuf getScanResponse(ByteBuf buffer, BigInteger cursor,
       List<Object> scanResult) {
-    ByteBuf response = alloc.buffer();
-
-    response.writeByte(ARRAY_ID);
-    response.writeBytes(intToBytes(2));
-    response.writeBytes(CRLFar);
-    response.writeByte(BULK_STRING_ID);
+    buffer.writeByte(ARRAY_ID);
+    buffer.writeBytes(intToBytes(2));
+    buffer.writeBytes(CRLFar);
+    buffer.writeByte(BULK_STRING_ID);
     byte[] cursorBytes = stringToBytes(cursor.toString());
-    response.writeBytes(intToBytes(cursorBytes.length));
-    response.writeBytes(CRLFar);
-    response.writeBytes(cursorBytes);
-    response.writeBytes(CRLFar);
-    response.writeByte(ARRAY_ID);
-    response.writeBytes(intToBytes(scanResult.size()));
-    response.writeBytes(CRLFar);
+    buffer.writeBytes(intToBytes(cursorBytes.length));
+    buffer.writeBytes(CRLFar);
+    buffer.writeBytes(cursorBytes);
+    buffer.writeBytes(CRLFar);
+    buffer.writeByte(ARRAY_ID);
+    buffer.writeBytes(intToBytes(scanResult.size()));
+    buffer.writeBytes(CRLFar);
 
     for (Object nextObject : scanResult) {
       if (nextObject instanceof String) {
         String next = (String) nextObject;
-        response.writeByte(BULK_STRING_ID);
-        response.writeBytes(intToBytes(next.length()));
-        response.writeBytes(CRLFar);
-        response.writeBytes(stringToBytes(next));
-        response.writeBytes(CRLFar);
+        buffer.writeByte(BULK_STRING_ID);
+        buffer.writeBytes(intToBytes(next.length()));
+        buffer.writeBytes(CRLFar);
+        buffer.writeBytes(stringToBytes(next));
+        buffer.writeBytes(CRLFar);
       } else if (nextObject instanceof ByteArrayWrapper) {
         byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
-        response.writeByte(BULK_STRING_ID);
-        response.writeBytes(intToBytes(next.length));
-        response.writeBytes(CRLFar);
-        response.writeBytes(next);
-        response.writeBytes(CRLFar);
+        buffer.writeByte(BULK_STRING_ID);
+        buffer.writeBytes(intToBytes(next.length));
+        buffer.writeBytes(CRLFar);
+        buffer.writeBytes(next);
+        buffer.writeBytes(CRLFar);
       }
     }
-    return response;
+    return buffer;
   }
 
-  public static ByteBuf getEmptyArrayResponse(ByteBufAllocator alloc) {
-    ByteBuf buf = alloc.buffer().writeBytes(bEMPTY_ARRAY);
-    return buf;
+  public static ByteBuf getEmptyArrayResponse(ByteBuf buffer) {
+    buffer.writeBytes(bEMPTY_ARRAY);
+    return buffer;
   }
 
-  public static ByteBuf getEmptyStringResponse(ByteBufAllocator alloc) {
-    ByteBuf buf = alloc.buffer().writeBytes(bEMPTY_STRING);
-    return buf;
+  public static ByteBuf getEmptyStringResponse(ByteBuf buffer) {
+    buffer.writeBytes(bEMPTY_STRING);
+    return buffer;
   }
 
-  public static ByteBuf getSimpleStringResponse(ByteBufAllocator alloc, String string) {
+  public static ByteBuf getSimpleStringResponse(ByteBuf buffer, String string) {
     byte[] simpAr = stringToBytes(string);
-    return getSimpleStringResponse(alloc, simpAr);
+    return getSimpleStringResponse(buffer, simpAr);
   }
 
-  public static ByteBuf getSimpleStringResponse(ByteBufAllocator alloc, byte[] byteArray) {
-    ByteBuf response = alloc.buffer(byteArray.length + 20);
-    response.writeByte(SIMPLE_STRING_ID);
-    response.writeBytes(byteArray);
-    response.writeBytes(CRLFar);
-    return response;
+  public static ByteBuf getSimpleStringResponse(ByteBuf buffer, byte[] byteArray) {
+    buffer.writeByte(SIMPLE_STRING_ID);
+    buffer.writeBytes(byteArray);
+    buffer.writeBytes(CRLFar);
+    return buffer;
+  }
+
+  public static ByteBuf getErrorResponse(ByteBuf buffer, String error) {
+    byte[] errorAr = stringToBytes(error);
+    buffer.writeByte(ERROR_ID);
+    buffer.writeBytes(err);
+    buffer.writeBytes(errorAr);
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getErrorResponse(ByteBufAllocator alloc, String error) {
+  public static ByteBuf getOOMResponse(ByteBuf buffer, String error) {
     byte[] errorAr = stringToBytes(error);
-    ByteBuf response = alloc.buffer(errorAr.length + 25);
-    response.writeByte(ERROR_ID);
-    response.writeBytes(err);
-    response.writeBytes(errorAr);
-    response.writeBytes(CRLFar);
-    return response;
+    buffer.writeByte(ERROR_ID);
+    buffer.writeBytes(oom);
+    buffer.writeBytes(errorAr);
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getCustomErrorResponse(ByteBufAllocator alloc, String error) {
+  public static ByteBuf getCustomErrorResponse(ByteBuf buffer, String error) {
     byte[] errorAr = stringToBytes(error);
-    ByteBuf response = alloc.buffer(errorAr.length + 25);
-    response.writeByte(ERROR_ID);
-    response.writeBytes(errorAr);
-    response.writeBytes(CRLFar);
-    return response;
+    buffer.writeByte(ERROR_ID);
+    buffer.writeBytes(errorAr);
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getWrongTypeResponse(ByteBufAllocator alloc, String error) {
+  public static ByteBuf getWrongTypeResponse(ByteBuf buffer, String error) {
     byte[] errorAr = stringToBytes(error);
-    ByteBuf response = alloc.buffer(errorAr.length + 31);
-    response.writeByte(ERROR_ID);
-    response.writeBytes(wrongType);
-    response.writeBytes(errorAr);
-    response.writeBytes(CRLFar);
-    return response;
+    buffer.writeByte(ERROR_ID);
+    buffer.writeBytes(wrongType);
+    buffer.writeBytes(errorAr);
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getIntegerResponse(ByteBufAllocator alloc, int integer) {
-    ByteBuf response = alloc.buffer(15);
-    response.writeByte(INTEGER_ID);
-    response.writeBytes(intToBytes(integer));
-    response.writeBytes(CRLFar);
-    return response;
+  public static ByteBuf getIntegerResponse(ByteBuf buffer, int integer) {
+    buffer.writeByte(INTEGER_ID);
+    buffer.writeBytes(intToBytes(integer));
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getIntegerResponse(ByteBufAllocator alloc, long l) {
-    ByteBuf response = alloc.buffer(25);
-    response.writeByte(INTEGER_ID);
-    response.writeBytes(longToBytes(l));
-    response.writeBytes(CRLFar);
-    return response;
+  public static ByteBuf getIntegerResponse(ByteBuf buffer, long l) {
+    buffer.writeByte(INTEGER_ID);
+    buffer.writeBytes(longToBytes(l));
+    buffer.writeBytes(CRLFar);
+    return buffer;
   }
 
-  public static ByteBuf getBigDecimalResponse(ByteBufAllocator alloc, BigDecimal b) {
-    ByteBuf response = alloc.buffer();
-    writeStringResponse(response, bigDecimalToBytes(b));
-    return response;
+  public static ByteBuf getBigDecimalResponse(ByteBuf buffer, BigDecimal b) {
+    writeStringResponse(buffer, bigDecimalToBytes(b));
+    return buffer;
   }
 
-  public static ByteBuf getNilResponse(ByteBufAllocator alloc) {
-    ByteBuf buf = alloc.buffer().writeBytes(bNIL);
-    return buf;
+  public static ByteBuf getNilResponse(ByteBuf buffer) {
+    buffer.writeBytes(bNIL);
+    return buffer;
   }