You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/05/01 06:12:52 UTC

[dubbo] branch master updated: support timeout pass and count down on RPC call chain. (#6008)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new f607690  support timeout pass and count down on RPC call chain. (#6008)
f607690 is described below

commit f607690663924a61310f63831d206122dd455adb
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri May 1 14:12:38 2020 +0800

    support timeout pass and count down on RPC call chain. (#6008)
    
    fixes #4041
---
 .../ConsumerContextClusterInterceptor.java         |  7 +-
 .../dubbo/common/constants/CommonConstants.java    |  4 +
 .../remoting/exchange/support/DefaultFuture.java   |  4 +-
 .../java/org/apache/dubbo/rpc/AppResponse.java     |  6 ++
 .../main/java/org/apache/dubbo/rpc/RpcContext.java |  9 +++
 .../java/org/apache/dubbo/rpc/RpcException.java    |  1 +
 .../org/apache/dubbo/rpc/TimeoutCountDown.java     | 90 ++++++++++++++++++++++
 .../dubbo/rpc/filter/ConsumerContextFilter.java    | 18 ++++-
 .../org/apache/dubbo/rpc/filter/ContextFilter.java | 11 ++-
 .../org/apache/dubbo/rpc/filter/TimeoutFilter.java | 21 +++--
 .../apache/dubbo/rpc/protocol/AbstractInvoker.java |  1 +
 .../org/apache/dubbo/rpc/support/RpcUtils.java     | 38 +++++++++
 .../dubbo/rpc/protocol/dubbo/DubboInvoker.java     | 18 ++++-
 13 files changed, 210 insertions(+), 18 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
index 468184a..4beb7a0 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
@@ -29,9 +29,8 @@ public class ConsumerContextClusterInterceptor implements ClusterInterceptor, Cl
 
     @Override
     public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
-        RpcContext.getContext()
-                .setInvocation(invocation)
-                .setLocalAddress(NetUtils.getLocalHost(), 0);
+        RpcContext context = RpcContext.getContext();
+        context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0);
         if (invocation instanceof RpcInvocation) {
             ((RpcInvocation) invocation).setInvoker(invoker);
         }
@@ -40,7 +39,7 @@ public class ConsumerContextClusterInterceptor implements ClusterInterceptor, Cl
 
     @Override
     public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
-        RpcContext.removeContext();
+        RpcContext.removeContext(true);
     }
 
     @Override
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index fe410e2..27c5744 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -110,6 +110,10 @@ public interface CommonConstants {
 
     int DEFAULT_TIMEOUT = 1000;
 
+    String TIME_COUNTDOWN_KEY = "timeout-countdown";
+
+    String ENABLE_TIMEOUT_COUNTDOWN_KEY = "enable-timeout-countdown";
+
     String REMOVE_VALUE_PREFIX = "-";
 
     String PROPERTIES_CHAR_SEPERATOR = "-";
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index a18d116..e5acf6d 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -176,9 +176,9 @@ public class DefaultFuture extends CompletableFuture<Object> {
             } else {
                 logger.warn("The timeout response finally returned at "
                         + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
-                        + ", response " + response
+                        + ", response status is " + response.getStatus()
                         + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
-                        + " -> " + channel.getRemoteAddress()));
+                        + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
             }
         } finally {
             CHANNELS.remove(response.getId());
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
index 9f26dd0..7ebff3f 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
@@ -232,6 +232,12 @@ public class AppResponse implements Result {
         throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
     }
 
+    public void clear() {
+        this.result = null;
+        this.exception = null;
+        this.attachments.clear();
+    }
+
     @Override
     public String toString() {
         return "AppResponse [value=" + result + ", exception=" + exception + "]";
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 99927d7..66ac06f 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -158,6 +158,15 @@ public class RpcContext {
      * @see org.apache.dubbo.rpc.filter.ContextFilter
      */
     public static void removeContext() {
+        removeContext(false);
+    }
+
+    /**
+     * customized for internal use.
+     *
+     * @param checkCanRemove if need check before remove
+     */
+    public static void removeContext(boolean checkCanRemove) {
         if (LOCAL.get().canRemove()) {
             LOCAL.remove();
         }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
index 88e5ee4..a95165a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
@@ -36,6 +36,7 @@ public /**final**/ class RpcException extends RuntimeException {
     public static final int SERIALIZATION_EXCEPTION = 5;
     public static final int NO_INVOKER_AVAILABLE_AFTER_FILTER = 6;
     public static final int LIMIT_EXCEEDED_EXCEPTION = 7;
+    public static final int TIMEOUT_TERMINATE = 8;
     private static final long serialVersionUID = 7815426752583648734L;
     /**
      * RpcException cannot be extended, use error code for exception type to keep compatibility
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/TimeoutCountDown.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/TimeoutCountDown.java
new file mode 100644
index 0000000..91b561d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/TimeoutCountDown.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import java.util.concurrent.TimeUnit;
+
+public final class TimeoutCountDown implements Comparable<TimeoutCountDown> {
+
+  public static TimeoutCountDown newCountDown(long timeout, TimeUnit unit) {
+    return new TimeoutCountDown(timeout, unit);
+  }
+
+  private final long timeoutInMillis;
+  private final long deadlineInNanos;
+  private volatile boolean expired;
+
+  private TimeoutCountDown(long timeout, TimeUnit unit) {
+    timeoutInMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
+    deadlineInNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit);
+  }
+
+  public long getTimeoutInMilli() {
+    return timeoutInMillis;
+  }
+
+  public boolean isExpired() {
+    if (!expired) {
+      if (deadlineInNanos - System.nanoTime() <= 0) {
+        expired = true;
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public long timeRemaining(TimeUnit unit) {
+    final long currentNanos = System.nanoTime();
+    if (!expired && deadlineInNanos - currentNanos <= 0) {
+      expired = true;
+    }
+    return unit.convert(deadlineInNanos - currentNanos, TimeUnit.NANOSECONDS);
+  }
+
+  public long elapsedMillis() {
+    if (isExpired()) {
+      return timeoutInMillis + TimeUnit.MILLISECONDS.convert(System.nanoTime() - deadlineInNanos, TimeUnit.NANOSECONDS);
+    } else {
+      return TimeUnit.MILLISECONDS.convert(deadlineInNanos - System.nanoTime(), TimeUnit.NANOSECONDS);
+    }
+  }
+
+  @Override
+  public String toString() {
+    long timeoutMillis = TimeUnit.MILLISECONDS.convert(deadlineInNanos, TimeUnit.NANOSECONDS);
+    long remainingMillis = timeRemaining(TimeUnit.MILLISECONDS);
+
+    StringBuilder buf = new StringBuilder();
+    buf.append("Total timeout value - ");
+    buf.append(timeoutMillis);
+    buf.append(", times remaining - ");
+    buf.append(remainingMillis);
+    return buf.toString();
+  }
+
+  @Override
+  public int compareTo(TimeoutCountDown another) {
+    long delta = this.deadlineInNanos - another.deadlineInNanos;
+    if (delta < 0) {
+      return -1;
+    } else if (delta > 0) {
+      return 1;
+    }
+    return 0;
+  }
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
index c935398..5aad186 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.filter;
 
 import org.apache.dubbo.common.extension.Activate;
 import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.rpc.AsyncRpcResult;
 import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
@@ -25,10 +26,12 @@ import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
 
 import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
 import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
 
 /**
  * ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port
@@ -42,8 +45,8 @@ public class ConsumerContextFilter implements Filter {
 
     @Override
     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
-        RpcContext.getContext()
-                .setInvoker(invoker)
+        RpcContext context = RpcContext.getContext();
+        context.setInvoker(invoker)
                 .setInvocation(invocation)
                 .setLocalAddress(NetUtils.getLocalHost(), 0)
                 .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort())
@@ -52,6 +55,17 @@ public class ConsumerContextFilter implements Filter {
         if (invocation instanceof RpcInvocation) {
             ((RpcInvocation) invocation).setInvoker(invoker);
         }
+
+        // pass default timeout set by end user (ReferenceConfig)
+        Object countDown = context.get(TIME_COUNTDOWN_KEY);
+        if (countDown != null) {
+            TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
+            if (timeoutCountDown.isExpired()) {
+                return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
+                        "No time left for making the following call: " + invocation.getServiceName() + "."
+                                + invocation.getMethodName() + ", terminate directly."), invocation);
+            }
+        }
         return invoker.invoke(invocation);
     }
 
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index ef3f5bc..0274474 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -25,11 +25,14 @@ import org.apache.dubbo.rpc.Result;
 import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
+import org.apache.dubbo.rpc.support.RpcUtils;
 
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
@@ -38,6 +41,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
 import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
 import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
 import static org.apache.dubbo.rpc.Constants.FORCE_USE_TAG;
@@ -113,13 +117,18 @@ public class ContextFilter implements Filter, Filter.Listener {
             ((RpcInvocation) invocation).setInvoker(invoker);
         }
 
+        long timeout = RpcUtils.getTimeout(invocation, -1);
+        if (timeout != -1) {
+            context.set(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS));
+        }
+
         try {
             context.clearAfterEachInvoke(false);
             return invoker.invoke(invocation);
         } finally {
             context.clearAfterEachInvoke(true);
             // IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
-            RpcContext.removeContext();
+            RpcContext.removeContext(true);
             RpcContext.removeServerContext();
         }
     }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TimeoutFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TimeoutFilter.java
index f828d4f..1dfd8ed 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TimeoutFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TimeoutFilter.java
@@ -20,14 +20,19 @@ import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.extension.Activate;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.AppResponse;
 import org.apache.dubbo.rpc.Filter;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.TimeoutCountDown;
 
 import java.util.Arrays;
 
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
+
 /**
  * Log any invocation timeout, but don't stop server from running
  */
@@ -36,22 +41,22 @@ public class TimeoutFilter implements Filter, Filter.Listener {
 
     private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
 
-    private static final String TIMEOUT_FILTER_START_TIME = "timeout_filter_start_time";
-
     @Override
     public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
-        invocation.put(TIMEOUT_FILTER_START_TIME, System.currentTimeMillis());
         return invoker.invoke(invocation);
     }
 
     @Override
     public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
-        Object startTime = invocation.get(TIMEOUT_FILTER_START_TIME);
-        if (startTime != null) {
-            long elapsed = System.currentTimeMillis() - (Long) startTime;
-            if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
+        Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
+        if (obj != null) {
+            TimeoutCountDown countDown = (TimeoutCountDown) obj;
+            if (countDown.isExpired()) {
+                ((AppResponse) appResponse).clear(); // clear response in case of timeout.
                 if (logger.isWarnEnabled()) {
-                    logger.warn("invoke time out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
+                    logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
+                            Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
+                            ", invoke elapsed " + countDown.elapsedMillis() + " ms.");
                 }
             }
         }
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 23eaeb6..90e101d 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -143,6 +143,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
         if (CollectionUtils.isNotEmptyMap(attachment)) {
             invocation.addObjectAttachmentsIfAbsent(attachment);
         }
+
         Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
         if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
             /**
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
index c65562b..9df1eea 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.common.utils.ReflectUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.InvokeMode;
+import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcInvocation;
 import org.apache.dubbo.rpc.service.GenericService;
 
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE;
 import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE_ASYNC;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
 import static org.apache.dubbo.rpc.Constants.$ECHO;
 import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
 import static org.apache.dubbo.rpc.Constants.AUTO_ATTACH_INVOCATIONID_KEY;
@@ -211,4 +213,40 @@ public class RpcUtils {
         }
         return method;
     }
+
+    public static long getTimeout(Invocation invocation, long defaultTimeout) {
+        long timeout = defaultTimeout;
+        Object genericTimeout = invocation.getObjectAttachment(TIMEOUT_KEY);
+        if (genericTimeout != null) {
+            timeout = convertToNumber(genericTimeout, defaultTimeout);
+        }
+        return timeout;
+    }
+
+    public static long getTimeout(URL url, String methodName, RpcContext context, long defaultTimeout) {
+        long timeout = defaultTimeout;
+        Object genericTimeout = context.getObjectAttachment(TIMEOUT_KEY);
+        if (genericTimeout != null) {
+            timeout = convertToNumber(genericTimeout, defaultTimeout);
+        } else if (url != null) {
+            timeout = url.getMethodPositiveParameter(methodName, TIMEOUT_KEY, defaultTimeout);
+        }
+        return timeout;
+    }
+
+    private static long convertToNumber(Object obj, long defaultTimeout) {
+        long timeout = 0;
+        try {
+            if (obj instanceof String) {
+                timeout = Long.parseLong((String) obj);
+            } else if (obj instanceof Number) {
+                timeout = ((Number) obj).longValue();
+            } else {
+                timeout = Long.parseLong(obj.toString());
+            }
+        } catch (Exception e) {
+            // ignore
+        }
+        return timeout;
+    }
 }
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index ef7a110..5ef50aa 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -29,21 +29,26 @@ import org.apache.dubbo.rpc.FutureContext;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
 import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
 import org.apache.dubbo.rpc.protocol.AbstractInvoker;
 import org.apache.dubbo.rpc.support.RpcUtils;
 
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
 import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
 
@@ -89,7 +94,18 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
         }
         try {
             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
-            int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
+            Object countdown = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
+            int timeout = DEFAULT_TIMEOUT;
+            if (countdown == null) {
+                timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getContext(), DEFAULT_TIMEOUT);
+                if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
+                    invocation.setObjectAttachment(TIMEOUT_KEY, timeout); // pass timeout to remote server
+                }
+            } else {
+                TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
+                timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
+                invocation.setObjectAttachment(TIMEOUT_KEY, timeout);// pass timeout to remote server
+            }
             if (isOneway) {
                 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                 currentClient.send(inv, isSent);