You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by up...@apache.org on 2021/03/16 18:55:04 UTC

[geode] 31/36: Add RedisDecoder to the netty pipeline

This is an automated email from the ASF dual-hosted git repository.

upthewaterspout pushed a commit to branch feature/redis-performance-testing
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 420b28ce48cc8184703a11da86a444a7819fc10f
Author: Jens Deppe <jd...@vmware.com>
AuthorDate: Mon Mar 15 15:03:52 2021 -0700

    Add RedisDecoder to the netty pipeline
---
 .../internal/netty/MessageToCommandDecoder.java    | 61 ++++++++++++++++++++++
 .../redis/internal/netty/NettyRedisServer.java     |  9 +++-
 2 files changed, 68 insertions(+), 2 deletions(-)

diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java
new file mode 100644
index 0000000..3f0685c
--- /dev/null
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/MessageToCommandDecoder.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.geode.redis.internal.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageDecoder;
+import io.netty.handler.codec.redis.ArrayHeaderRedisMessage;
+import io.netty.handler.codec.redis.FullBulkStringRedisMessage;
+import io.netty.handler.codec.redis.RedisMessage;
+
+public class MessageToCommandDecoder extends MessageToMessageDecoder<RedisMessage> {
+
+  private static final ThreadLocal<LocalRedisArray> currentCommand =
+      ThreadLocal.withInitial(LocalRedisArray::new);
+
+  @Override
+  protected void decode(ChannelHandlerContext ctx, RedisMessage msg, List<Object> out)
+      throws Exception {
+    LocalRedisArray array = currentCommand.get();
+
+    if (msg instanceof ArrayHeaderRedisMessage) {
+      array.count = ((ArrayHeaderRedisMessage) msg).length();
+      return;
+    }
+
+    if (msg instanceof FullBulkStringRedisMessage) {
+      ByteBuf buffer = ((FullBulkStringRedisMessage) msg).content();
+      byte[] data = new byte[buffer.readableBytes()];
+      buffer.readBytes(data);
+      array.list.add(data);
+      array.count--;
+
+      if (array.count == 0) {
+        out.add(new Command(array.list));
+        currentCommand.remove();
+      }
+    }
+  }
+
+  private static class LocalRedisArray {
+    List<byte[]> list = new ArrayList<>();
+    long count = 0;
+  }
+}
diff --git a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
index 3a1d903..5110696 100644
--- a/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
+++ b/geode-redis/src/main/java/org/apache/geode/redis/internal/netty/NettyRedisServer.java
@@ -43,6 +43,8 @@ import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.handler.codec.redis.RedisBulkStringAggregator;
+import io.netty.handler.codec.redis.RedisDecoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.timeout.WriteTimeoutHandler;
@@ -169,8 +171,11 @@ public class NettyRedisServer {
         }
         ChannelPipeline pipeline = socketChannel.pipeline();
         addSSLIfEnabled(socketChannel, pipeline);
-        pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(),
-            new ByteToCommandDecoder(redisStats));
+        // pipeline.addLast(ByteToCommandDecoder.class.getSimpleName(),
+        // new ByteToCommandDecoder(redisStats));
+        pipeline.addLast(new RedisDecoder());
+        pipeline.addLast(new RedisBulkStringAggregator());
+        pipeline.addLast(new MessageToCommandDecoder());
         pipeline.addLast(new WriteTimeoutHandler(10));
         pipeline.addLast(ExecutionHandlerContext.class.getSimpleName(),
             new ExecutionHandlerContext(socketChannel, regionProvider, pubsub,