You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2021/03/15 16:37:53 UTC
[geode] 01/02: GEODE-9021: Remove unnecessary buffer allocation in
redis Coder (#6113)
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch support/1.14
in repository https://gitbox.apache.org/repos/asf/geode.git
commit 71c6eb68a6a79cef51433d5ab047a964181a9565
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Thu Mar 11 15:10:15 2021 -0800
GEODE-9021: Remove unnecessary buffer allocation in redis Coder (#6113)
(cherry picked from commit 00c0e7980bdead3995fc352c3e0f3f52cce4e4fb)
---
.../redis/internal/executor/RedisResponse.java | 46 ++--
.../apache/geode/redis/internal/netty/Coder.java | 252 ++++++++++-----------
2 files changed, 138 insertions(+), 160 deletions(-)
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
index 9723987..f6e909e 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/executor/RedisResponse.java
@@ -33,11 +33,11 @@ import org.apache.geode.redis.internal.netty.CoderException;
public class RedisResponse {
- private final Function<ByteBufAllocator, ByteBuf> coderCallback;
+ private final Function<ByteBuf, ByteBuf> coderCallback;
private Runnable afterWriteCallback;
- private RedisResponse(Function<ByteBufAllocator, ByteBuf> coderCallback) {
+ private RedisResponse(Function<ByteBuf, ByteBuf> coderCallback) {
this.coderCallback = coderCallback;
}
@@ -52,37 +52,37 @@ public class RedisResponse {
}
public ByteBuf encode(ByteBufAllocator allocator) {
- return coderCallback.apply(allocator);
+ return coderCallback.apply(allocator.buffer());
}
public static RedisResponse integer(long numericValue) {
- return new RedisResponse((bba) -> Coder.getIntegerResponse(bba, numericValue));
+ return new RedisResponse((buffer) -> Coder.getIntegerResponse(buffer, numericValue));
}
public static RedisResponse integer(boolean exists) {
- return new RedisResponse((bba) -> Coder.getIntegerResponse(bba, exists ? 1 : 0));
+ return new RedisResponse((buffer) -> Coder.getIntegerResponse(buffer, exists ? 1 : 0));
}
public static RedisResponse string(String stringValue) {
- return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, stringValue));
+ return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, stringValue));
}
public static RedisResponse string(byte[] byteArray) {
- return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, byteArray));
+ return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, byteArray));
}
public static RedisResponse bulkString(Object value) {
- return new RedisResponse((bba) -> {
+ return new RedisResponse((buffer) -> {
try {
- return Coder.getBulkStringResponse(bba, value);
+ return Coder.getBulkStringResponse(buffer, value);
} catch (CoderException e) {
- return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+ return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
}
});
}
public static RedisResponse ok() {
- return new RedisResponse((bba) -> Coder.getSimpleStringResponse(bba, "OK"));
+ return new RedisResponse((buffer) -> Coder.getSimpleStringResponse(buffer, "OK"));
}
public static RedisResponse nil() {
@@ -90,21 +90,21 @@ public class RedisResponse {
}
public static RedisResponse flattenedArray(Collection<Collection<?>> nestedCollection) {
- return new RedisResponse((bba) -> {
+ return new RedisResponse((buffer) -> {
try {
- return Coder.getFlattenedArrayResponse(bba, nestedCollection);
+ return Coder.getFlattenedArrayResponse(buffer, nestedCollection);
} catch (CoderException e) {
- return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+ return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
}
});
}
public static RedisResponse array(Collection<?> collection) {
- return new RedisResponse((bba) -> {
+ return new RedisResponse((buffer) -> {
try {
- return Coder.getArrayResponse(bba, collection);
+ return Coder.getArrayResponse(buffer, collection);
} catch (CoderException e) {
- return Coder.getErrorResponse(bba, "Internal server error: " + e.getMessage());
+ return Coder.getErrorResponse(buffer, "Internal server error: " + e.getMessage());
}
});
}
@@ -118,21 +118,19 @@ public class RedisResponse {
}
public static RedisResponse error(String error) {
- return new RedisResponse((bba) -> Coder.getErrorResponse(bba, error));
+ return new RedisResponse((buffer) -> Coder.getErrorResponse(buffer, error));
}
public static RedisResponse customError(String error) {
- return new RedisResponse((bba) -> Coder.getCustomErrorResponse(bba, error));
+ return new RedisResponse((buffer) -> Coder.getCustomErrorResponse(buffer, error));
}
public static RedisResponse wrongType(String error) {
- return new RedisResponse((bba) -> Coder.getWrongTypeResponse(bba, error));
+ return new RedisResponse((buffer) -> Coder.getWrongTypeResponse(buffer, error));
}
public static RedisResponse scan(BigInteger cursor, List<Object> scanResult) {
-
- return new RedisResponse(
- (bba) -> Coder.getScanResponse(bba, cursor, scanResult));
+ return new RedisResponse((buffer) -> Coder.getScanResponse(buffer, cursor, scanResult));
}
public static RedisResponse emptyScan() {
@@ -147,7 +145,7 @@ public class RedisResponse {
}
public static RedisResponse bigDecimal(BigDecimal numericValue) {
- return new RedisResponse((bba) -> Coder.getBigDecimalResponse(bba, numericValue));
+ return new RedisResponse((buffer) -> Coder.getBigDecimalResponse(buffer, numericValue));
}
}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
index 0f62e1f..19f709f 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/Coder.java
@@ -22,7 +22,6 @@ import java.util.Collection;
import java.util.List;
import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufAllocator;
import org.apache.geode.annotations.internal.MakeImmutable;
import org.apache.geode.redis.internal.data.ByteArrayWrapper;
@@ -32,13 +31,6 @@ import org.apache.geode.redis.internal.data.ByteArrayWrapper;
*/
public class Coder {
-
- /*
- * Take no chances on char to byte conversions with default charsets on jvms, so we'll hard code
- * the UTF-8 symbol values as bytes here
- */
-
-
/**
* byte identifier of a bulk string
*/
@@ -87,6 +79,10 @@ public class Coder {
@MakeImmutable
public static final byte[] err = stringToBytes("ERR ");
+
+ @MakeImmutable
+ public static final byte[] oom = stringToBytes("OOM ");
+
@MakeImmutable
public static final byte[] wrongType = stringToBytes("WRONGTYPE ");
@@ -112,94 +108,78 @@ public class Coder {
*/
public static final String N_INF = "-inf";
- public static ByteBuf getBulkStringResponse(ByteBufAllocator alloc, Object v)
+ public static ByteBuf getBulkStringResponse(ByteBuf buffer, Object v)
throws CoderException {
- ByteBuf response;
byte[] toWrite;
if (v == null) {
- response = alloc.buffer();
- response.writeBytes(bNIL);
+ buffer.writeBytes(bNIL);
} else if (v instanceof byte[]) {
- byte[] value = (byte[]) v;
- response = alloc.buffer(value.length + 20);
- toWrite = value;
- writeStringResponse(response, toWrite);
+ toWrite = (byte[]) v;
+ writeStringResponse(buffer, toWrite);
} else if (v instanceof ByteArrayWrapper) {
- byte[] value = ((ByteArrayWrapper) v).toBytes();
- response = alloc.buffer(value.length + 20);
- toWrite = value;
- writeStringResponse(response, toWrite);
+ toWrite = ((ByteArrayWrapper) v).toBytes();
+ writeStringResponse(buffer, toWrite);
} else if (v instanceof Double) {
- response = alloc.buffer();
- toWrite = doubleToBytes(((Double) v).doubleValue());
- writeStringResponse(response, toWrite);
+ toWrite = doubleToBytes((Double) v);
+ writeStringResponse(buffer, toWrite);
} else if (v instanceof String) {
String value = (String) v;
- response = alloc.buffer(value.length() + 20);
toWrite = stringToBytes(value);
- writeStringResponse(response, toWrite);
+ writeStringResponse(buffer, toWrite);
} else if (v instanceof Integer) {
- response = alloc.buffer(15);
- response.writeByte(INTEGER_ID);
- response.writeBytes(intToBytes((Integer) v));
- response.writeBytes(CRLFar);
+ buffer.writeByte(INTEGER_ID);
+ buffer.writeBytes(intToBytes((Integer) v));
+ buffer.writeBytes(CRLFar);
} else if (v instanceof Long) {
- response = alloc.buffer(15);
- response.writeByte(INTEGER_ID);
- response.writeBytes(intToBytes(((Long) v).intValue()));
- response.writeBytes(CRLFar);
+ buffer.writeByte(INTEGER_ID);
+ buffer.writeBytes(intToBytes(((Long) v).intValue()));
+ buffer.writeBytes(CRLFar);
} else {
throw new CoderException();
}
- return response;
+ return buffer;
}
- private static void writeStringResponse(ByteBuf response, byte[] toWrite) {
- response.writeByte(BULK_STRING_ID);
- response.writeBytes(intToBytes(toWrite.length));
- response.writeBytes(CRLFar);
- response.writeBytes(toWrite);
- response.writeBytes(CRLFar);
+ private static void writeStringResponse(ByteBuf buffer, byte[] toWrite) {
+ buffer.writeByte(BULK_STRING_ID);
+ buffer.writeBytes(intToBytes(toWrite.length));
+ buffer.writeBytes(CRLFar);
+ buffer.writeBytes(toWrite);
+ buffer.writeBytes(CRLFar);
}
- public static ByteBuf getFlattenedArrayResponse(ByteBufAllocator alloc,
- Collection<Collection<?>> items)
+ public static ByteBuf getFlattenedArrayResponse(ByteBuf buffer, Collection<Collection<?>> items)
throws CoderException {
- ByteBuf response = alloc.buffer();
-
for (Object next : items) {
- writeCollectionOrString(alloc, response, next);
+ writeCollectionOrString(buffer, next);
}
- return response;
+ return buffer;
}
- public static ByteBuf getArrayResponse(ByteBufAllocator alloc, Collection<?> items)
+ public static ByteBuf getArrayResponse(ByteBuf buffer, Collection<?> items)
throws CoderException {
- ByteBuf response = alloc.buffer();
- response.writeByte(ARRAY_ID);
- response.writeBytes(intToBytes(items.size()));
- response.writeBytes(CRLFar);
+ buffer.writeByte(ARRAY_ID);
+ buffer.writeBytes(intToBytes(items.size()));
+ buffer.writeBytes(CRLFar);
for (Object next : items) {
- writeCollectionOrString(alloc, response, next);
+ writeCollectionOrString(buffer, next);
}
- return response;
+ return buffer;
}
- private static void writeCollectionOrString(ByteBufAllocator alloc, ByteBuf response, Object next)
+ private static void writeCollectionOrString(ByteBuf buffer, Object next)
throws CoderException {
ByteBuf tmp = null;
try {
if (next instanceof Collection) {
Collection<?> nextItems = (Collection<?>) next;
- tmp = getArrayResponse(alloc, nextItems);
- response.writeBytes(tmp);
+ getArrayResponse(buffer, nextItems);
} else {
- tmp = getBulkStringResponse(alloc, next);
- response.writeBytes(tmp);
+ getBulkStringResponse(buffer, next);
}
} finally {
if (tmp != null) {
@@ -208,120 +188,120 @@ public class Coder {
}
}
- public static ByteBuf getScanResponse(ByteBufAllocator alloc, BigInteger cursor,
+ public static ByteBuf getScanResponse(ByteBuf buffer, BigInteger cursor,
List<Object> scanResult) {
- ByteBuf response = alloc.buffer();
-
- response.writeByte(ARRAY_ID);
- response.writeBytes(intToBytes(2));
- response.writeBytes(CRLFar);
- response.writeByte(BULK_STRING_ID);
+ buffer.writeByte(ARRAY_ID);
+ buffer.writeBytes(intToBytes(2));
+ buffer.writeBytes(CRLFar);
+ buffer.writeByte(BULK_STRING_ID);
byte[] cursorBytes = stringToBytes(cursor.toString());
- response.writeBytes(intToBytes(cursorBytes.length));
- response.writeBytes(CRLFar);
- response.writeBytes(cursorBytes);
- response.writeBytes(CRLFar);
- response.writeByte(ARRAY_ID);
- response.writeBytes(intToBytes(scanResult.size()));
- response.writeBytes(CRLFar);
+ buffer.writeBytes(intToBytes(cursorBytes.length));
+ buffer.writeBytes(CRLFar);
+ buffer.writeBytes(cursorBytes);
+ buffer.writeBytes(CRLFar);
+ buffer.writeByte(ARRAY_ID);
+ buffer.writeBytes(intToBytes(scanResult.size()));
+ buffer.writeBytes(CRLFar);
for (Object nextObject : scanResult) {
if (nextObject instanceof String) {
String next = (String) nextObject;
- response.writeByte(BULK_STRING_ID);
- response.writeBytes(intToBytes(next.length()));
- response.writeBytes(CRLFar);
- response.writeBytes(stringToBytes(next));
- response.writeBytes(CRLFar);
+ buffer.writeByte(BULK_STRING_ID);
+ buffer.writeBytes(intToBytes(next.length()));
+ buffer.writeBytes(CRLFar);
+ buffer.writeBytes(stringToBytes(next));
+ buffer.writeBytes(CRLFar);
} else if (nextObject instanceof ByteArrayWrapper) {
byte[] next = ((ByteArrayWrapper) nextObject).toBytes();
- response.writeByte(BULK_STRING_ID);
- response.writeBytes(intToBytes(next.length));
- response.writeBytes(CRLFar);
- response.writeBytes(next);
- response.writeBytes(CRLFar);
+ buffer.writeByte(BULK_STRING_ID);
+ buffer.writeBytes(intToBytes(next.length));
+ buffer.writeBytes(CRLFar);
+ buffer.writeBytes(next);
+ buffer.writeBytes(CRLFar);
}
}
- return response;
+ return buffer;
}
- public static ByteBuf getEmptyArrayResponse(ByteBufAllocator alloc) {
- ByteBuf buf = alloc.buffer().writeBytes(bEMPTY_ARRAY);
- return buf;
+ public static ByteBuf getEmptyArrayResponse(ByteBuf buffer) {
+ buffer.writeBytes(bEMPTY_ARRAY);
+ return buffer;
}
- public static ByteBuf getEmptyStringResponse(ByteBufAllocator alloc) {
- ByteBuf buf = alloc.buffer().writeBytes(bEMPTY_STRING);
- return buf;
+ public static ByteBuf getEmptyStringResponse(ByteBuf buffer) {
+ buffer.writeBytes(bEMPTY_STRING);
+ return buffer;
}
- public static ByteBuf getSimpleStringResponse(ByteBufAllocator alloc, String string) {
+ public static ByteBuf getSimpleStringResponse(ByteBuf buffer, String string) {
byte[] simpAr = stringToBytes(string);
- return getSimpleStringResponse(alloc, simpAr);
+ return getSimpleStringResponse(buffer, simpAr);
}
- public static ByteBuf getSimpleStringResponse(ByteBufAllocator alloc, byte[] byteArray) {
- ByteBuf response = alloc.buffer(byteArray.length + 20);
- response.writeByte(SIMPLE_STRING_ID);
- response.writeBytes(byteArray);
- response.writeBytes(CRLFar);
- return response;
+ public static ByteBuf getSimpleStringResponse(ByteBuf buffer, byte[] byteArray) {
+ buffer.writeByte(SIMPLE_STRING_ID);
+ buffer.writeBytes(byteArray);
+ buffer.writeBytes(CRLFar);
+ return buffer;
+ }
+
+ public static ByteBuf getErrorResponse(ByteBuf buffer, String error) {
+ byte[] errorAr = stringToBytes(error);
+ buffer.writeByte(ERROR_ID);
+ buffer.writeBytes(err);
+ buffer.writeBytes(errorAr);
+ buffer.writeBytes(CRLFar);
+ return buffer;
}
- public static ByteBuf getErrorResponse(ByteBufAllocator alloc, String error) {
+ public static ByteBuf getOOMResponse(ByteBuf buffer, String error) {
byte[] errorAr = stringToBytes(error);
- ByteBuf response = alloc.buffer(errorAr.length + 25);
- response.writeByte(ERROR_ID);
- response.writeBytes(err);
- response.writeBytes(errorAr);
- response.writeBytes(CRLFar);
- return response;
+ buffer.writeByte(ERROR_ID);
+ buffer.writeBytes(oom);
+ buffer.writeBytes(errorAr);
+ buffer.writeBytes(CRLFar);
+ return buffer;
}
- public static ByteBuf getCustomErrorResponse(ByteBufAllocator alloc, String error) {
+ public static ByteBuf getCustomErrorResponse(ByteBuf buffer, String error) {
byte[] errorAr = stringToBytes(error);
- ByteBuf response = alloc.buffer(errorAr.length + 25);
- response.writeByte(ERROR_ID);
- response.writeBytes(errorAr);
- response.writeBytes(CRLFar);
- return response;
+ buffer.writeByte(ERROR_ID);
+ buffer.writeBytes(errorAr);
+ buffer.writeBytes(CRLFar);
+ return buffer;
}
- public static ByteBuf getWrongTypeResponse(ByteBufAllocator alloc, String error) {
+ public static ByteBuf getWrongTypeResponse(ByteBuf buffer, String error) {
byte[] errorAr = stringToBytes(error);
- ByteBuf response = alloc.buffer(errorAr.length + 31);
- response.writeByte(ERROR_ID);
- response.writeBytes(wrongType);
- response.writeBytes(errorAr);
- response.writeBytes(CRLFar);
- return response;
+ buffer.writeByte(ERROR_ID);
+ buffer.writeBytes(wrongType);
+ buffer.writeBytes(errorAr);
+ buffer.writeBytes(CRLFar);
+ return buffer;
}
- public static ByteBuf getIntegerResponse(ByteBufAllocator alloc, int integer) {
- ByteBuf response = alloc.buffer(15);
- response.writeByte(INTEGER_ID);
- response.writeBytes(intToBytes(integer));
- response.writeBytes(CRLFar);
- return response;
+ public static ByteBuf getIntegerResponse(ByteBuf buffer, int integer) {
+ buffer.writeByte(INTEGER_ID);
+ buffer.writeBytes(intToBytes(integer));
+ buffer.writeBytes(CRLFar);
+ return buffer;
}
- public static ByteBuf getIntegerResponse(ByteBufAllocator alloc, long l) {
- ByteBuf response = alloc.buffer(25);
- response.writeByte(INTEGER_ID);
- response.writeBytes(longToBytes(l));
- response.writeBytes(CRLFar);
- return response;
+ public static ByteBuf getIntegerResponse(ByteBuf buffer, long l) {
+ buffer.writeByte(INTEGER_ID);
+ buffer.writeBytes(longToBytes(l));
+ buffer.writeBytes(CRLFar);
+ return buffer;
}
- public static ByteBuf getBigDecimalResponse(ByteBufAllocator alloc, BigDecimal b) {
- ByteBuf response = alloc.buffer();
- writeStringResponse(response, bigDecimalToBytes(b));
- return response;
+ public static ByteBuf getBigDecimalResponse(ByteBuf buffer, BigDecimal b) {
+ writeStringResponse(buffer, bigDecimalToBytes(b));
+ return buffer;
}
- public static ByteBuf getNilResponse(ByteBufAllocator alloc) {
- ByteBuf buf = alloc.buffer().writeBytes(bNIL);
- return buf;
+ public static ByteBuf getNilResponse(ByteBuf buffer) {
+ buffer.writeBytes(bNIL);
+ return buffer;
}