You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/11/04 00:28:40 UTC

[dubbo] branch 3.2 updated: support encode in user thread, optimize decode in user thread (#10854)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 5016f550be support encode in user thread, optimize decode in user thread (#10854)
5016f550be is described below

commit 5016f550be52f14a232399409a3c97fa6d6db321
Author: 一个不知名的Java靓仔 <cl...@gmail.com>
AuthorDate: Fri Nov 4 08:28:23 2022 +0800

    support encode in user thread, optimize decode in user thread (#10854)
---
 .../dubbo/common/constants/CommonConstants.java    |  4 ++
 .../dubbo/common/utils/CacheableSupplier.java      | 43 ++++++++++++
 .../remoting/transport/netty4/NettyChannel.java    | 79 +++++++++++++++++++++-
 .../transport/netty4/NettyClientHandler.java       | 46 -------------
 .../transport/netty4/NettyCodecAdapter.java        | 26 +++++--
 .../transport/netty4/NettyChannelTest.java         | 15 +++-
 .../transport/netty4/NettyClientHandlerTest.java   | 10 +--
 .../protocol/dubbo/DecodeableRpcInvocation.java    |  8 ++-
 8 files changed, 170 insertions(+), 61 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index b8709881cd..01c1de33a4 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -600,4 +600,8 @@ public interface CommonConstants {
      *
      */
     String DUBBO_JSTACK_MAXLINE = "dubbo.jstack-dump.max-line";
+
+
+    String ENCODE_IN_IO_THREAD_KEY = "encode.in.io";
+    boolean DEFAULT_ENCODE_IN_IO_THREAD = false;
 }
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/CacheableSupplier.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/CacheableSupplier.java
new file mode 100644
index 0000000000..cc81d34d05
--- /dev/null
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/CacheableSupplier.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.common.utils;
+
+import java.util.function.Supplier;
+
+public class CacheableSupplier<T> implements Supplier<T> {
+
+    private volatile T object;
+
+    private final Supplier<T> supplier;
+
+    public CacheableSupplier(Supplier<T> supplier) {
+        this.supplier = supplier;
+    }
+
+    public static <T> CacheableSupplier<T> newSupplier(Supplier<T> supplier) {
+        return new CacheableSupplier<>(supplier);
+    }
+
+    @Override
+    public T get() {
+        if (this.object == null) {
+            this.object = supplier.get();
+        }
+        return object;
+    }
+}
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
index 24e3a8057f..9006e713c7 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
@@ -16,16 +16,27 @@
  */
 package org.apache.dubbo.remoting.transport.netty4;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFutureListener;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.Codec;
+import org.apache.dubbo.remoting.Codec2;
+import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.buffer.ChannelBuffer;
+import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.remoting.exchange.Response;
 import org.apache.dubbo.remoting.transport.AbstractChannel;
+import org.apache.dubbo.remoting.transport.codec.CodecAdapter;
 import org.apache.dubbo.remoting.utils.PayloadDropper;
 
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import org.apache.dubbo.rpc.model.FrameworkModel;
 
 import java.net.InetSocketAddress;
 import java.util.Map;
@@ -33,8 +44,11 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_ENCODE_IN_IO_THREAD;
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.ENCODE_IN_IO_THREAD_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.rpc.model.ScopeModelUtil.getFrameworkModel;
 
 /**
  * NettyChannel maintains the cache of channel.
@@ -57,6 +71,10 @@ final class NettyChannel extends AbstractChannel {
 
     private final Netty4BatchWriteQueue writeQueue;
 
+    private final Codec2 codec;
+
+    private final boolean encodeInIOThread;
+
     /**
      * The constructor of NettyChannel.
      * It is private so NettyChannel usually create by {@link NettyChannel#getOrAddChannel(Channel, URL, ChannelHandler)}
@@ -72,6 +90,8 @@ final class NettyChannel extends AbstractChannel {
         }
         this.channel = channel;
         this.writeQueue = Netty4BatchWriteQueue.createWriteQueue(channel);
+        this.codec = getChannelCodec(url);
+        this.encodeInIOThread = getUrl().getParameter(ENCODE_IN_IO_THREAD_KEY, DEFAULT_ENCODE_IN_IO_THREAD);
     }
 
     /**
@@ -164,7 +184,33 @@ final class NettyChannel extends AbstractChannel {
         boolean success = true;
         int timeout = 0;
         try {
-            ChannelFuture future = writeQueue.enqueue(message);
+            Object outputMessage = message;
+            if (!encodeInIOThread) {
+                ByteBuf buf = channel.alloc().buffer();
+                ChannelBuffer buffer = new NettyBackedChannelBuffer(buf);
+                codec.encode(this, buffer, message);
+                outputMessage = buf;
+            }
+            ChannelFuture future = writeQueue.enqueue(outputMessage).addListener(new ChannelFutureListener() {
+                @Override
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (!(message instanceof Request)) {
+                        return;
+                    }
+                    ChannelHandler handler = getChannelHandler();
+                    if (future.isSuccess()) {
+                        handler.sent(NettyChannel.this, message);
+                    } else {
+                        Throwable t = future.cause();
+                        if (t == null) {
+                            return;
+                        }
+                        Response response = buildErrorResponse((Request) message, t);
+                        handler.received(NettyChannel.this, response);
+                    }
+                }
+            });
+
             if (sent) {
                 // wait timeout ms
                 timeout = getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
@@ -287,4 +333,35 @@ final class NettyChannel extends AbstractChannel {
     public Channel getNioChannel() {
         return channel;
     }
+
+    /**
+     * build a bad request's response
+     *
+     * @param request the request
+     * @param t       the throwable. In most cases, serialization fails.
+     * @return the response
+     */
+    private static Response buildErrorResponse(Request request, Throwable t) {
+        Response response = new Response(request.getId(), request.getVersion());
+        response.setStatus(Response.BAD_REQUEST);
+        response.setErrorMessage(StringUtils.toString(t));
+        return response;
+    }
+
+    private static Codec2 getChannelCodec(URL url) {
+        String codecName = url.getParameter(Constants.CODEC_KEY);
+        if (StringUtils.isEmpty(codecName)) {
+            // codec extension name must stay the same with protocol name
+            codecName = url.getProtocol();
+        }
+        FrameworkModel frameworkModel = getFrameworkModel(url.getScopeModel());
+        if (frameworkModel.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
+            return frameworkModel.getExtensionLoader(Codec2.class).getExtension(codecName);
+        } else if(frameworkModel.getExtensionLoader(Codec.class).hasExtension(codecName)){
+            return new CodecAdapter(frameworkModel.getExtensionLoader(Codec.class)
+                .getExtension(codecName));
+        }else {
+            return frameworkModel.getExtensionLoader(Codec2.class).getExtension("default");
+        }
+    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
index 63690a8759..0ab1c129fd 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
@@ -20,16 +20,13 @@ import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.Version;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.exchange.Request;
-import org.apache.dubbo.remoting.exchange.Response;
 
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.handler.timeout.IdleStateEvent;
-import org.apache.dubbo.remoting.exchange.support.MultiMessage;
 
 import static org.apache.dubbo.common.constants.CommonConstants.HEARTBEAT_EVENT;
 
@@ -87,36 +84,6 @@ public class NettyClientHandler extends ChannelDuplexHandler {
     @Override
     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
         super.write(ctx, msg, promise);
-        final NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
-        final boolean isRequest = msg instanceof Request;
-        final boolean isMultiMessage = msg instanceof MultiMessage;
-
-        // We add listeners to make sure our out bound event is correct.
-        // If our out bound event has an error (in most cases the encoder fails),
-        // we need to have the request return directly instead of blocking the invoke process.
-        promise.addListener(future -> {
-            if (future.isSuccess()) {
-                // if our future is success, mark the future to sent.
-                handler.sent(channel, msg);
-                return;
-            }
-
-            Throwable t = future.cause();
-            if (t != null && isRequest) {
-                Request request = (Request) msg;
-                Response response = buildErrorResponse(request, t);
-                handler.received(channel, response);
-            } else if (t != null && isMultiMessage) {
-                MultiMessage multiMessage = (MultiMessage) msg;
-                for (Object originMessage : multiMessage) {
-                    if (originMessage instanceof Request) {
-                        Request request = (Request) originMessage;
-                        Response response = buildErrorResponse(request, t);
-                        handler.received(channel, response);
-                    }
-                }
-            }
-        });
     }
 
     @Override
@@ -152,17 +119,4 @@ public class NettyClientHandler extends ChannelDuplexHandler {
         }
     }
 
-    /**
-     * build a bad request's response
-     *
-     * @param request the request
-     * @param t       the throwable. In most cases, serialization fails.
-     * @return the response
-     */
-    private static Response buildErrorResponse(Request request, Throwable t) {
-        Response response = new Response(request.getId(), request.getVersion());
-        response.setStatus(Response.BAD_REQUEST);
-        response.setErrorMessage(StringUtils.toString(t));
-        return response;
-    }
 }
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
index beed485b3a..bf029272ab 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyCodecAdapter.java
@@ -26,6 +26,7 @@ import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.dubbo.remoting.exchange.support.MultiMessage;
 
 import java.io.IOException;
 import java.util.List;
@@ -63,10 +64,27 @@ final public class NettyCodecAdapter {
 
         @Override
         protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
-            ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
-            Channel ch = ctx.channel();
-            NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
-            codec.encode(channel, buffer, msg);
+            boolean encoded = false;
+            if (msg instanceof ByteBuf) {
+                out.writeBytes(((ByteBuf) msg));
+                encoded = true;
+            } else if (msg instanceof MultiMessage) {
+                for (Object singleMessage : ((MultiMessage) msg)) {
+                    if (singleMessage instanceof ByteBuf) {
+                        ByteBuf buf = (ByteBuf) singleMessage;
+                        out.writeBytes(buf);
+                        encoded = true;
+                        buf.release();
+                    }
+                }
+            }
+
+            if (!encoded) {
+                ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
+                Channel ch = ctx.channel();
+                NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
+                codec.encode(channel, buffer, msg);
+            }
         }
     }
 
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelTest.java
index 93add5737c..9a79ee2f2f 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelTest.java
@@ -16,17 +16,19 @@
  */
 package org.apache.dubbo.remoting.transport.netty4;
 
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoop;
+import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.url.component.ServiceConfigURL;
 import org.apache.dubbo.remoting.ChannelHandler;
 import org.apache.dubbo.remoting.RemotingException;
 
 import io.netty.channel.Channel;
-import io.netty.channel.ChannelFuture;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.net.InetSocketAddress;
@@ -78,12 +80,14 @@ public class NettyChannelTest {
     @Test
     public void testSend() throws Exception {
         Mockito.when(channel.eventLoop()).thenReturn(Mockito.mock(EventLoop.class));
+        Mockito.when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT);
         NettyChannel nettyChannel = NettyChannel.getOrAddChannel(channel, url, channelHandler);
         ChannelPromise future = Mockito.mock(ChannelPromise.class);
         Mockito.when(future.await(1000)).thenReturn(true);
         Mockito.when(future.cause()).thenReturn(null);
         Mockito.when(channel.writeAndFlush(Mockito.any())).thenReturn(future);
         Mockito.when(channel.newPromise()).thenReturn(future);
+        Mockito.when(future.addListener(Mockito.any())).thenReturn(future);
         nettyChannel.send("msg", true);
 
         NettyChannel finalNettyChannel = nettyChannel;
@@ -99,6 +103,15 @@ public class NettyChannelTest {
         Assertions.assertThrows(RemotingException.class, () -> {
             finalNettyChannel.send("msg", true);
         }, "in timeout(1000ms) limit");
+
+        ChannelPromise channelPromise = Mockito.mock(ChannelPromise.class);
+        Mockito.when(channel.newPromise()).thenReturn(channelPromise);
+        Mockito.when(channelPromise.await(1000)).thenReturn(true);
+        Mockito.when(channelPromise.cause()).thenReturn(null);
+        Mockito.when(channelPromise.addListener(Mockito.any())).thenReturn(channelPromise);
+        finalNettyChannel.send("msg", true);
+        ArgumentCaptor<GenericFutureListener> listenerArgumentCaptor = ArgumentCaptor.forClass(GenericFutureListener.class);
+        Mockito.verify(channelPromise, Mockito.times(1)).addListener(listenerArgumentCaptor.capture());
     }
 
     @Test
diff --git a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
index ddf2853082..7d5690c8a6 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
+++ b/dubbo-remoting/dubbo-remoting-netty4/src/test/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandlerTest.java
@@ -16,12 +16,12 @@
  */
 package org.apache.dubbo.remoting.transport.netty4;
 
+import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPromise;
 import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.handler.timeout.IdleStateEvent;
-import io.netty.util.concurrent.GenericFutureListener;
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.url.component.ServiceConfigURL;
 import org.apache.dubbo.remoting.ChannelHandler;
@@ -45,11 +45,13 @@ public class NettyClientHandlerTest {
         Mockito.when(ctx.channel()).thenReturn(channel);
         Mockito.when(channel.isActive()).thenReturn(true);
         Mockito.when(channel.eventLoop()).thenReturn(new NioEventLoopGroup().next());
+        Mockito.when(channel.alloc()).thenReturn(PooledByteBufAllocator.DEFAULT);
 
         ChannelPromise future = mock(ChannelPromise.class);
         when(channel.writeAndFlush(any())).thenReturn(future);
         when(future.cause()).thenReturn(null);
         when(channel.newPromise()).thenReturn(future);
+        when(future.addListener(Mockito.any())).thenReturn(future);
 
         NettyClientHandler nettyClientHandler = new NettyClientHandler(url, handler);
 
@@ -78,11 +80,5 @@ public class NettyClientHandlerTest {
         Mockito.verify(channel, Mockito.times(1)).writeAndFlush(requestArgumentCaptor.capture());
 
 
-        Request request = new Request();
-        ChannelPromise promise = Mockito.mock(ChannelPromise.class);
-        nettyClientHandler.write(ctx,request,promise);
-        ArgumentCaptor<GenericFutureListener> listenerArgumentCaptor = ArgumentCaptor.forClass(GenericFutureListener.class);
-        Mockito.verify(promise, Mockito.times(1)).addListener(listenerArgumentCaptor.capture());
-
     }
 }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
index 700687e881..14b7ebb903 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DecodeableRpcInvocation.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.rpc.protocol.dubbo;
 
 
+import org.apache.dubbo.common.utils.CacheableSupplier;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.serialize.Cleanable;
@@ -45,6 +46,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Supplier;
 
 import static org.apache.dubbo.common.BaseServiceMetadata.keyWithoutGroup;
 import static org.apache.dubbo.common.URL.buildKey;
@@ -71,7 +73,7 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
 
     protected final FrameworkModel frameworkModel;
 
-    private CallbackServiceCodec callbackServiceCodec;
+    private Supplier<CallbackServiceCodec> callbackServiceCodecFactory;
 
     public DecodeableRpcInvocation(FrameworkModel frameworkModel, Channel channel, Request request, InputStream is, byte id) {
         this.frameworkModel = frameworkModel;
@@ -82,7 +84,8 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
         this.request = request;
         this.inputStream = is;
         this.serializationType = id;
-        callbackServiceCodec = new CallbackServiceCodec(frameworkModel);
+        this.callbackServiceCodecFactory = CacheableSupplier.newSupplier(()->
+            new CallbackServiceCodec(frameworkModel));
     }
 
     @Override
@@ -220,6 +223,7 @@ public class DecodeableRpcInvocation extends RpcInvocation implements Codec, Dec
             }
 
             //decode argument ,may be callback
+            CallbackServiceCodec callbackServiceCodec = callbackServiceCodecFactory.get();
             for (int i = 0; i < args.length; i++) {
                 args[i] = callbackServiceCodec.decodeInvocationArgument(channel, this, pts, i, args[i]);
             }