You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/05/01 06:12:52 UTC
[dubbo] branch master updated: support timeout pass and count down
on RPC call chain. (#6008)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new f607690 support timeout pass and count down on RPC call chain. (#6008)
f607690 is described below
commit f607690663924a61310f63831d206122dd455adb
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri May 1 14:12:38 2020 +0800
support timeout pass and count down on RPC call chain. (#6008)
fixes #4041
---
.../ConsumerContextClusterInterceptor.java | 7 +-
.../dubbo/common/constants/CommonConstants.java | 4 +
.../remoting/exchange/support/DefaultFuture.java | 4 +-
.../java/org/apache/dubbo/rpc/AppResponse.java | 6 ++
.../main/java/org/apache/dubbo/rpc/RpcContext.java | 9 +++
.../java/org/apache/dubbo/rpc/RpcException.java | 1 +
.../org/apache/dubbo/rpc/TimeoutCountDown.java | 90 ++++++++++++++++++++++
.../dubbo/rpc/filter/ConsumerContextFilter.java | 18 ++++-
.../org/apache/dubbo/rpc/filter/ContextFilter.java | 11 ++-
.../org/apache/dubbo/rpc/filter/TimeoutFilter.java | 21 +++--
.../apache/dubbo/rpc/protocol/AbstractInvoker.java | 1 +
.../org/apache/dubbo/rpc/support/RpcUtils.java | 38 +++++++++
.../dubbo/rpc/protocol/dubbo/DubboInvoker.java | 18 ++++-
13 files changed, 210 insertions(+), 18 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
index 468184a..4beb7a0 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/interceptor/ConsumerContextClusterInterceptor.java
@@ -29,9 +29,8 @@ public class ConsumerContextClusterInterceptor implements ClusterInterceptor, Cl
@Override
public void before(AbstractClusterInvoker<?> invoker, Invocation invocation) {
- RpcContext.getContext()
- .setInvocation(invocation)
- .setLocalAddress(NetUtils.getLocalHost(), 0);
+ RpcContext context = RpcContext.getContext();
+ context.setInvocation(invocation).setLocalAddress(NetUtils.getLocalHost(), 0);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
@@ -40,7 +39,7 @@ public class ConsumerContextClusterInterceptor implements ClusterInterceptor, Cl
@Override
public void after(AbstractClusterInvoker<?> clusterInvoker, Invocation invocation) {
- RpcContext.removeContext();
+ RpcContext.removeContext(true);
}
@Override
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
index fe410e2..27c5744 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/CommonConstants.java
@@ -110,6 +110,10 @@ public interface CommonConstants {
int DEFAULT_TIMEOUT = 1000;
+ String TIME_COUNTDOWN_KEY = "timeout-countdown";
+
+ String ENABLE_TIMEOUT_COUNTDOWN_KEY = "enable-timeout-countdown";
+
String REMOVE_VALUE_PREFIX = "-";
String PROPERTIES_CHAR_SEPERATOR = "-";
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
index a18d116..e5acf6d 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/support/DefaultFuture.java
@@ -176,9 +176,9 @@ public class DefaultFuture extends CompletableFuture<Object> {
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
- + ", response " + response
+ + ", response status is " + response.getStatus()
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
- + " -> " + channel.getRemoteAddress()));
+ + " -> " + channel.getRemoteAddress()) + ", please check provider side for detailed result.");
}
} finally {
CHANNELS.remove(response.getId());
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
index 9f26dd0..7ebff3f 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/AppResponse.java
@@ -232,6 +232,12 @@ public class AppResponse implements Result {
throw new UnsupportedOperationException("AppResponse represents an concrete business response, there will be no status changes, you should get internal values directly.");
}
+ public void clear() {
+ this.result = null;
+ this.exception = null;
+ this.attachments.clear();
+ }
+
@Override
public String toString() {
return "AppResponse [value=" + result + ", exception=" + exception + "]";
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 99927d7..66ac06f 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -158,6 +158,15 @@ public class RpcContext {
* @see org.apache.dubbo.rpc.filter.ContextFilter
*/
public static void removeContext() {
+ removeContext(false);
+ }
+
+ /**
+ * customized for internal use.
+ *
+ * @param checkCanRemove if need check before remove
+ */
+ public static void removeContext(boolean checkCanRemove) {
if (LOCAL.get().canRemove()) {
LOCAL.remove();
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
index 88e5ee4..a95165a 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcException.java
@@ -36,6 +36,7 @@ public /**final**/ class RpcException extends RuntimeException {
public static final int SERIALIZATION_EXCEPTION = 5;
public static final int NO_INVOKER_AVAILABLE_AFTER_FILTER = 6;
public static final int LIMIT_EXCEEDED_EXCEPTION = 7;
+ public static final int TIMEOUT_TERMINATE = 8;
private static final long serialVersionUID = 7815426752583648734L;
/**
* RpcException cannot be extended, use error code for exception type to keep compatibility
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/TimeoutCountDown.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/TimeoutCountDown.java
new file mode 100644
index 0000000..91b561d
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/TimeoutCountDown.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc;
+
+import java.util.concurrent.TimeUnit;
+
+public final class TimeoutCountDown implements Comparable<TimeoutCountDown> {
+
+ public static TimeoutCountDown newCountDown(long timeout, TimeUnit unit) {
+ return new TimeoutCountDown(timeout, unit);
+ }
+
+ private final long timeoutInMillis;
+ private final long deadlineInNanos;
+ private volatile boolean expired;
+
+ private TimeoutCountDown(long timeout, TimeUnit unit) {
+ timeoutInMillis = TimeUnit.MILLISECONDS.convert(timeout, unit);
+ deadlineInNanos = System.nanoTime() + TimeUnit.NANOSECONDS.convert(timeout, unit);
+ }
+
+ public long getTimeoutInMilli() {
+ return timeoutInMillis;
+ }
+
+ public boolean isExpired() {
+ if (!expired) {
+ if (deadlineInNanos - System.nanoTime() <= 0) {
+ expired = true;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public long timeRemaining(TimeUnit unit) {
+ final long currentNanos = System.nanoTime();
+ if (!expired && deadlineInNanos - currentNanos <= 0) {
+ expired = true;
+ }
+ return unit.convert(deadlineInNanos - currentNanos, TimeUnit.NANOSECONDS);
+ }
+
+ public long elapsedMillis() {
+ if (isExpired()) {
+ return timeoutInMillis + TimeUnit.MILLISECONDS.convert(System.nanoTime() - deadlineInNanos, TimeUnit.NANOSECONDS);
+ } else {
+ return TimeUnit.MILLISECONDS.convert(deadlineInNanos - System.nanoTime(), TimeUnit.NANOSECONDS);
+ }
+ }
+
+ @Override
+ public String toString() {
+ long timeoutMillis = TimeUnit.MILLISECONDS.convert(deadlineInNanos, TimeUnit.NANOSECONDS);
+ long remainingMillis = timeRemaining(TimeUnit.MILLISECONDS);
+
+ StringBuilder buf = new StringBuilder();
+ buf.append("Total timeout value - ");
+ buf.append(timeoutMillis);
+ buf.append(", times remaining - ");
+ buf.append(remainingMillis);
+ return buf.toString();
+ }
+
+ @Override
+ public int compareTo(TimeoutCountDown another) {
+ long delta = this.deadlineInNanos - another.deadlineInNanos;
+ if (delta < 0) {
+ return -1;
+ } else if (delta > 0) {
+ return 1;
+ }
+ return 0;
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
index c935398..5aad186 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ConsumerContextFilter.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.rpc.filter;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -25,10 +26,12 @@ import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
/**
* ConsumerContextFilter set current RpcContext with invoker,invocation, local host, remote host and port
@@ -42,8 +45,8 @@ public class ConsumerContextFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
- RpcContext.getContext()
- .setInvoker(invoker)
+ RpcContext context = RpcContext.getContext();
+ context.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort())
@@ -52,6 +55,17 @@ public class ConsumerContextFilter implements Filter {
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
+
+ // pass default timeout set by end user (ReferenceConfig)
+ Object countDown = context.get(TIME_COUNTDOWN_KEY);
+ if (countDown != null) {
+ TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countDown;
+ if (timeoutCountDown.isExpired()) {
+ return AsyncRpcResult.newDefaultAsyncResult(new RpcException(RpcException.TIMEOUT_TERMINATE,
+ "No time left for making the following call: " + invocation.getServiceName() + "."
+ + invocation.getMethodName() + ", terminate directly."), invocation);
+ }
+ }
return invoker.invoke(invocation);
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
index ef3f5bc..0274474 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ContextFilter.java
@@ -25,11 +25,14 @@ import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
+import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
@@ -38,6 +41,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
import static org.apache.dubbo.rpc.Constants.FORCE_USE_TAG;
@@ -113,13 +117,18 @@ public class ContextFilter implements Filter, Filter.Listener {
((RpcInvocation) invocation).setInvoker(invoker);
}
+ long timeout = RpcUtils.getTimeout(invocation, -1);
+ if (timeout != -1) {
+ context.set(TIME_COUNTDOWN_KEY, TimeoutCountDown.newCountDown(timeout, TimeUnit.MILLISECONDS));
+ }
+
try {
context.clearAfterEachInvoke(false);
return invoker.invoke(invocation);
} finally {
context.clearAfterEachInvoke(true);
// IMPORTANT! For async scenario, we must remove context from current thread, so we always create a new RpcContext for the next invoke for the same thread.
- RpcContext.removeContext();
+ RpcContext.removeContext(true);
RpcContext.removeServerContext();
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TimeoutFilter.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TimeoutFilter.java
index f828d4f..1dfd8ed 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TimeoutFilter.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/TimeoutFilter.java
@@ -20,14 +20,19 @@ import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Filter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.TimeoutCountDown;
import java.util.Arrays;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
+
/**
* Log any invocation timeout, but don't stop server from running
*/
@@ -36,22 +41,22 @@ public class TimeoutFilter implements Filter, Filter.Listener {
private static final Logger logger = LoggerFactory.getLogger(TimeoutFilter.class);
- private static final String TIMEOUT_FILTER_START_TIME = "timeout_filter_start_time";
-
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
- invocation.put(TIMEOUT_FILTER_START_TIME, System.currentTimeMillis());
return invoker.invoke(invocation);
}
@Override
public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
- Object startTime = invocation.get(TIMEOUT_FILTER_START_TIME);
- if (startTime != null) {
- long elapsed = System.currentTimeMillis() - (Long) startTime;
- if (invoker.getUrl() != null && elapsed > invoker.getUrl().getMethodParameter(invocation.getMethodName(), "timeout", Integer.MAX_VALUE)) {
+ Object obj = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
+ if (obj != null) {
+ TimeoutCountDown countDown = (TimeoutCountDown) obj;
+ if (countDown.isExpired()) {
+ ((AppResponse) appResponse).clear(); // clear response in case of timeout.
if (logger.isWarnEnabled()) {
- logger.warn("invoke time out. method: " + invocation.getMethodName() + " arguments: " + Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() + ", invoke elapsed " + elapsed + " ms.");
+ logger.warn("invoke timed out. method: " + invocation.getMethodName() + " arguments: " +
+ Arrays.toString(invocation.getArguments()) + " , url is " + invoker.getUrl() +
+ ", invoke elapsed " + countDown.elapsedMillis() + " ms.");
}
}
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
index 23eaeb6..90e101d 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/protocol/AbstractInvoker.java
@@ -143,6 +143,7 @@ public abstract class AbstractInvoker<T> implements Invoker<T> {
if (CollectionUtils.isNotEmptyMap(attachment)) {
invocation.addObjectAttachmentsIfAbsent(attachment);
}
+
Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
if (CollectionUtils.isNotEmptyMap(contextAttachments)) {
/**
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
index c65562b..9df1eea 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/support/RpcUtils.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.common.utils.ReflectUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.InvokeMode;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.service.GenericService;
@@ -33,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE;
import static org.apache.dubbo.common.constants.CommonConstants.$INVOKE_ASYNC;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
import static org.apache.dubbo.rpc.Constants.$ECHO;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
import static org.apache.dubbo.rpc.Constants.AUTO_ATTACH_INVOCATIONID_KEY;
@@ -211,4 +213,40 @@ public class RpcUtils {
}
return method;
}
+
+ public static long getTimeout(Invocation invocation, long defaultTimeout) {
+ long timeout = defaultTimeout;
+ Object genericTimeout = invocation.getObjectAttachment(TIMEOUT_KEY);
+ if (genericTimeout != null) {
+ timeout = convertToNumber(genericTimeout, defaultTimeout);
+ }
+ return timeout;
+ }
+
+ public static long getTimeout(URL url, String methodName, RpcContext context, long defaultTimeout) {
+ long timeout = defaultTimeout;
+ Object genericTimeout = context.getObjectAttachment(TIMEOUT_KEY);
+ if (genericTimeout != null) {
+ timeout = convertToNumber(genericTimeout, defaultTimeout);
+ } else if (url != null) {
+ timeout = url.getMethodPositiveParameter(methodName, TIMEOUT_KEY, defaultTimeout);
+ }
+ return timeout;
+ }
+
+ private static long convertToNumber(Object obj, long defaultTimeout) {
+ long timeout = 0;
+ try {
+ if (obj instanceof String) {
+ timeout = Long.parseLong((String) obj);
+ } else if (obj instanceof Number) {
+ timeout = ((Number) obj).longValue();
+ } else {
+ timeout = Long.parseLong(obj.toString());
+ }
+ } catch (Exception e) {
+ // ignore
+ }
+ return timeout;
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
index ef7a110..5ef50aa 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java
@@ -29,21 +29,26 @@ import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
+import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_TIMEOUT;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLE_TIMEOUT_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIME_COUNTDOWN_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
@@ -89,7 +94,18 @@ public class DubboInvoker<T> extends AbstractInvoker<T> {
}
try {
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
- int timeout = getUrl().getMethodPositiveParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT);
+ Object countdown = RpcContext.getContext().get(TIME_COUNTDOWN_KEY);
+ int timeout = DEFAULT_TIMEOUT;
+ if (countdown == null) {
+ timeout = (int) RpcUtils.getTimeout(getUrl(), methodName, RpcContext.getContext(), DEFAULT_TIMEOUT);
+ if (getUrl().getParameter(ENABLE_TIMEOUT_COUNTDOWN_KEY, false)) {
+ invocation.setObjectAttachment(TIMEOUT_KEY, timeout); // pass timeout to remote server
+ }
+ } else {
+ TimeoutCountDown timeoutCountDown = (TimeoutCountDown) countdown;
+ timeout = (int) timeoutCountDown.timeRemaining(TimeUnit.MILLISECONDS);
+ invocation.setObjectAttachment(TIMEOUT_KEY, timeout);// pass timeout to remote server
+ }
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);