You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by gu...@apache.org on 2021/09/27 02:14:51 UTC
[dubbo] branch 3.0 updated: [3.0-Triple] Add handler for reset
frame (#8871)
This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 84346e7 [3.0-Triple] Add handler for reset frame (#8871)
84346e7 is described below
commit 84346e7ac5652a43d30c2dc97380de059b0b9ffd
Author: earthchen <yo...@duobei.com>
AuthorDate: Mon Sep 27 10:14:36 2021 +0800
[3.0-Triple] Add handler for reset frame (#8871)
* handler rst frame
* fix license
* fix style
* Move CancelContext into ServiceContext
* optimize rst frame handler
* change CancellationContext sync method
* remove unreachable code
* fix checkstyle
---
.../org/apache/dubbo/rpc/CancellationContext.java | 111 +++++++++++++++++++++
.../apache/dubbo/rpc/CancellationListener.java} | 18 ++--
.../org/apache/dubbo/rpc/ExecutableListener.java | 69 +++++++++++++
.../main/java/org/apache/dubbo/rpc/RpcContext.java | 16 ++-
.../java/org/apache/dubbo/rpc/RpcInvocation.java | 10 ++
.../org/apache/dubbo/rpc/RpcServiceContext.java | 1 -
.../rpc/protocol/tri/AbstractClientStream.java | 12 ++-
.../rpc/protocol/tri/AbstractServerStream.java | 16 ++-
.../dubbo/rpc/protocol/tri/AbstractStream.java | 66 ++++++++++--
.../rpc/protocol/tri/ClientTransportObserver.java | 80 ++++++++++-----
.../dubbo/rpc/protocol/tri/ServerStream.java | 3 +-
.../rpc/protocol/tri/ServerTransportObserver.java | 42 ++++++--
.../dubbo/rpc/protocol/tri/TransportObserver.java | 8 +-
.../rpc/protocol/tri/TripleClientHandler.java | 9 ++
.../tri/TripleHttp2ClientResponseHandler.java | 10 ++
.../tri/TripleHttp2FrameServerHandler.java | 111 ++++++++++++---------
.../rpc/protocol/tri/TripleHttp2Protocol.java | 37 +++----
.../dubbo/rpc/protocol/tri/TripleInvoker.java | 9 +-
.../tri/TripleServerConnectionHandler.java | 5 +
.../dubbo/rpc/protocol/tri/UnaryClientStream.java | 13 +--
.../dubbo/rpc/protocol/tri/UnaryServerStream.java | 3 +-
21 files changed, 518 insertions(+), 131 deletions(-)
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/CancellationContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/CancellationContext.java
new file mode 100644
index 0000000..c341551
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/CancellationContext.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Executor;
+
+public class CancellationContext implements Closeable {
+
+ private ArrayList<ExecutableListener> listeners;
+ private Throwable cancellationCause;
+ private boolean cancelled;
+
+ public void addListener(
+ final CancellationListener cancellationListener, final Executor executor) {
+ addListener(cancellationListener, executor, null);
+ }
+
+ public void addListener(
+ final CancellationListener cancellationListener) {
+ addListener(cancellationListener, Runnable::run, null);
+ }
+
+ public void addListener(
+ final CancellationListener cancellationListener,
+ final RpcServiceContext context) {
+ addListener(cancellationListener, Runnable::run, context);
+ }
+
+ public void addListener(
+ final CancellationListener cancellationListener,
+ final Executor executor,
+ final RpcServiceContext context) {
+ addListenerInternal(new ExecutableListener(executor, cancellationListener, context));
+ }
+
+ public synchronized void addListenerInternal(ExecutableListener executableListener) {
+ if (isCancelled()) {
+ executableListener.deliver();
+ } else {
+ if (listeners == null) {
+ listeners = new ArrayList<>();
+ }
+ listeners.add(executableListener);
+ }
+ }
+
+ public boolean cancel(Throwable cause) {
+ boolean triggeredCancel = false;
+ synchronized (this) {
+ if (!cancelled) {
+ cancelled = true;
+ this.cancellationCause = cause;
+ triggeredCancel = true;
+ }
+ }
+ if (triggeredCancel) {
+ notifyAndClearListeners();
+ }
+ return triggeredCancel;
+ }
+
+ private void notifyAndClearListeners() {
+ ArrayList<ExecutableListener> tmpListeners;
+ synchronized (this) {
+ if (listeners == null) {
+ return;
+ }
+ tmpListeners = listeners;
+ listeners = null;
+ }
+ for (ExecutableListener tmpListener : tmpListeners) {
+ tmpListener.deliver();
+ }
+ }
+
+ public synchronized boolean isCancelled() {
+ return cancelled;
+ }
+
+ public List<ExecutableListener> getListeners() {
+ return listeners;
+ }
+
+ public Throwable getCancellationCause() {
+ return cancellationCause;
+ }
+
+ @Override
+ public void close() throws IOException {
+ cancel(null);
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/CancellationListener.java
similarity index 73%
copy from dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
copy to dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/CancellationListener.java
index 64d833b..5f32510 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/CancellationListener.java
@@ -15,14 +15,18 @@
* limitations under the License.
*/
-package org.apache.dubbo.rpc.protocol.tri;
+package org.apache.dubbo.rpc;
-public interface TransportObserver {
-
- void onMetadata(Metadata metadata, boolean endStream);
-
- void onData(byte[] data, boolean endStream);
+/**
+ * A listener notified on context cancellation.
+ */
+public interface CancellationListener {
- default void onComplete(){}
+ /**
+ * Notifies that a context was cancelled.
+ *
+ * @param context the newly cancelled context.
+ */
+ void cancelled(RpcServiceContext context);
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ExecutableListener.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ExecutableListener.java
new file mode 100644
index 0000000..c263bf8
--- /dev/null
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/ExecutableListener.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dubbo.rpc;
+
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
+import java.util.concurrent.Executor;
+
+
+public class ExecutableListener implements Runnable {
+
+ private static final Logger log = LoggerFactory.getLogger(ExecutableListener.class);
+
+ private final Executor executor;
+ private final CancellationListener listener;
+ private final RpcServiceContext context;
+
+ public ExecutableListener(Executor executor, CancellationListener listener, RpcServiceContext context) {
+ this.executor = executor;
+ this.listener = listener;
+ this.context = context;
+ }
+
+ public ExecutableListener(Executor executor, CancellationListener listener) {
+ this(executor, listener, null);
+ }
+
+
+ public void deliver() {
+ try {
+ executor.execute(this);
+ } catch (Throwable t) {
+ log.warn("Exception notifying context listener", t);
+ }
+ }
+
+ public Executor getExecutor() {
+ return executor;
+ }
+
+ public CancellationListener getListener() {
+ return listener;
+ }
+
+ public RpcServiceContext getContext() {
+ return context;
+ }
+
+ @Override
+ public void run() {
+ listener.cancelled(context);
+ }
+}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index f88a7a9..d1e15ee 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -38,7 +38,7 @@ import java.util.concurrent.Future;
* There are four kinds of RpcContext, which are ServerContext, ClientAttachment, ServerAttachment and ServiceContext.
* <p/>
* ServiceContext: Using to pass environment parameters in the whole invocation. For example, `remotingApplicationName`,
- * `remoteAddress`, etc. {@link RpcServiceContext}
+ * `remoteAddress`, etc. {@link RpcServiceContext}
* ClientAttachment, ServerAttachment and ServiceContext are using to transfer attachments.
* Imaging a situation like this, A is calling B, and B will call C, after that, B wants to return some attachments back to A.
* ClientAttachment is using to pass attachments to next hop as a consumer. ( A --> B , in A side)
@@ -84,6 +84,19 @@ public class RpcContext {
}
};
+ private static final InternalThreadLocal<CancellationContext> CANCELLATION_CONTEXT = new InternalThreadLocal<CancellationContext>() {
+ @Override
+ protected CancellationContext initialValue() {
+ return new CancellationContext();
+ }
+ };
+
+
+ public static CancellationContext getCancellationContext() {
+ return CANCELLATION_CONTEXT.get();
+ }
+
+
private boolean remove = true;
protected RpcContext() {
@@ -201,6 +214,7 @@ public class RpcContext {
}
SERVER_LOCAL.remove();
SERVICE_CONTEXT.remove();
+ CANCELLATION_CONTEXT.remove();
}
/**
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
index 86345f6..05cfa00 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcInvocation.java
@@ -86,6 +86,16 @@ public class RpcInvocation implements Invocation, Serializable {
private transient InvokeMode invokeMode;
+ private transient CancellationContext cancellationContext;
+
+ public CancellationContext getCancellationContext() {
+ return cancellationContext;
+ }
+
+ public void setCancellationContext(CancellationContext cancellationContext) {
+ this.cancellationContext = cancellationContext;
+ }
+
public RpcInvocation() {
}
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
index 8ecc368..8c2657c 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java
@@ -651,5 +651,4 @@ public class RpcServiceContext extends RpcContext {
RpcServiceContext rpcContext = RpcContext.getServiceContext();
rpcContext.setConsumerUrl(url);
}
-
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
index b0898ba..9d34461 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractClientStream.java
@@ -17,16 +17,19 @@
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpHeaderValues;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.api.Connection;
+import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.triple.TripleWrapper;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpHeaderValues;
+import io.netty.handler.codec.http2.Http2Error;
+
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
@@ -164,6 +167,7 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
}
protected class ClientStreamObserver implements StreamObserver<Object> {
+
@Override
public void onNext(Object data) {
RpcInvocation invocation = (RpcInvocation) data;
@@ -184,4 +188,8 @@ public abstract class AbstractClientStream extends AbstractStream implements Str
}
}
+ @Override
+ protected void cancelByRemoteReset(Http2Error http2Error) {
+ DefaultFuture2.getFuture(getRequest().getId()).cancel();
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
index 016dd64..7069ab9 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractServerStream.java
@@ -17,13 +17,13 @@
package org.apache.dubbo.rpc.protocol.tri;
-import com.google.protobuf.Message;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.FrameworkServiceRepository;
import org.apache.dubbo.rpc.model.MethodDescriptor;
@@ -32,6 +32,9 @@ import org.apache.dubbo.rpc.model.ScopeModelUtil;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.triple.TripleWrapper;
+import com.google.protobuf.Message;
+import io.netty.handler.codec.http2.Http2Error;
+
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -119,8 +122,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
protected RpcInvocation buildInvocation(Metadata metadata) {
RpcInvocation inv = new RpcInvocation(getUrl().getServiceModel(),
- getMethodName(), getServiceDescriptor().getServiceName(),
- getUrl().getProtocolServiceKey(), getMethodDescriptor().getParameterClasses(), new Object[0]);
+ getMethodName(), getServiceDescriptor().getServiceName(),
+ getUrl().getProtocolServiceKey(), getMethodDescriptor().getParameterClasses(), new Object[0]);
inv.setTargetServiceUniqueName(getUrl().getServiceKey());
inv.setReturnTypes(getMethodDescriptor().getReturnTypes());
@@ -130,6 +133,9 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
for (HeaderFilter headerFilter : getHeaderFilters()) {
inv = headerFilter.invoke(getInvoker(), inv);
}
+ if (getCancellationContext() == null) {
+ setCancellationContext(RpcContext.getCancellationContext());
+ }
return inv;
}
@@ -224,4 +230,8 @@ public abstract class AbstractServerStream extends AbstractStream implements Str
return this;
}
+ @Override
+ protected void cancelByRemoteReset(Http2Error http2Error) {
+ getCancellationContext().cancel(null);
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
index 3b8f7fe..cbd5413 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/AbstractStream.java
@@ -17,10 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri;
-import com.google.protobuf.Any;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.Status;
-import io.netty.handler.codec.http2.Http2Headers;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.serialize.MultipleSerialization;
@@ -29,10 +25,17 @@ import org.apache.dubbo.common.threadlocal.NamedInternalThreadFactory;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.config.Constants;
import org.apache.dubbo.remoting.exchange.Request;
+import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.model.MethodDescriptor;
import org.apache.dubbo.rpc.model.ServiceDescriptor;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
+import com.google.protobuf.Any;
+import com.google.rpc.DebugInfo;
+import com.google.rpc.Status;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Headers;
+
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
@@ -73,10 +76,25 @@ public abstract class AbstractStream implements Stream {
private StreamObserver<Object> streamSubscriber;
private TransportObserver transportSubscriber;
+ private CancellationContext cancellationContext;
+ private boolean cancelled = false;
+
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
protected AbstractStream(URL url) {
this(url, allocateCallbackExecutor());
}
+ protected CancellationContext getCancellationContext() {
+ return cancellationContext;
+ }
+
+ protected void setCancellationContext(CancellationContext cancellationContext) {
+ this.cancellationContext = cancellationContext;
+ }
+
protected AbstractStream(URL url, Executor executor) {
this.url = url;
this.executor = executor;
@@ -119,6 +137,27 @@ public abstract class AbstractStream implements Stream {
return this;
}
+ /**
+ * local cancel
+ *
+ * @param cause cancel case
+ */
+ protected void cancel(Throwable cause) {
+ getCancellationContext().cancel(cause);
+ }
+
+ /**
+ * remote cancel
+ *
+ * @param http2Error {@link Http2Error}
+ */
+ protected final void cancelByRemote(Http2Error http2Error) {
+ cancelled = true;
+ cancelByRemoteReset(http2Error);
+ }
+
+ protected abstract void cancelByRemoteReset(Http2Error http2Error);
+
protected abstract StreamObserver<Object> createStreamObserver();
protected abstract TransportObserver createTransportObserver();
@@ -198,12 +237,12 @@ public abstract class AbstractStream implements Stream {
getTransportSubscriber().onMetadata(trailers, true);
if (LOGGER.isErrorEnabled()) {
LOGGER.error("[Triple-Server-Error] status=" + status.code.code + " service=" + getServiceDescriptor().getServiceName()
- + " method=" + getMethodName() +" onlyTrailers=" + onlyTrailers, status.cause);
+ + " method=" + getMethodName() + " onlyTrailers=" + onlyTrailers, status.cause);
}
}
protected void transportError(GrpcStatus status, Map<String, Object> attachments) {
- transportError(status, attachments,false);
+ transportError(status, attachments, false);
}
protected void transportError(GrpcStatus status) {
@@ -236,11 +275,11 @@ public abstract class AbstractStream implements Stream {
if (throwable == null) {
Status status = builder.build();
metadata.put(TripleHeaderEnum.STATUS_DETAIL_KEY.getHeader(),
- TripleUtil.encodeBase64ASCII(status.toByteArray()));
+ TripleUtil.encodeBase64ASCII(status.toByteArray()));
return metadata;
}
DebugInfo debugInfo = DebugInfo.newBuilder()
- .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable,10))
+ .addAllStackEntries(ExceptionUtils.getStackFrameList(throwable, 10))
// can not use now
// .setDetail(throwable.getMessage())
.build();
@@ -303,7 +342,7 @@ public abstract class AbstractStream implements Stream {
}
}
- protected static abstract class AbstractTransportObserver implements TransportObserver {
+ protected abstract class AbstractTransportObserver implements TransportObserver {
private Metadata headers;
private Metadata trailers;
@@ -316,6 +355,11 @@ public abstract class AbstractStream implements Stream {
}
@Override
+ public void onReset(Http2Error http2Error) {
+ getTransportSubscriber().onReset(http2Error);
+ }
+
+ @Override
public void onMetadata(Metadata metadata, boolean endStream) {
if (headers == null) {
headers = metadata;
@@ -343,7 +387,7 @@ public abstract class AbstractStream implements Stream {
}
- protected abstract static class UnaryTransportObserver extends AbstractTransportObserver implements TransportObserver {
+ protected abstract class UnaryTransportObserver extends AbstractTransportObserver implements TransportObserver {
private byte[] data;
public byte[] getData() {
@@ -362,7 +406,7 @@ public abstract class AbstractStream implements Stream {
}
}
- protected abstract void doOnComplete() ;
+ protected abstract void doOnComplete();
@Override
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
index cd3f1e6..78bda21 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ClientTransportObserver.java
@@ -24,6 +24,8 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
+import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
@@ -36,6 +38,7 @@ public class ClientTransportObserver implements TransportObserver {
private final ChannelPromise promise;
private boolean headerSent = false;
private boolean endStreamSent = false;
+ private boolean resetSent = false;
public ClientTransportObserver(ChannelHandlerContext ctx, AbstractClientStream stream, ChannelPromise promise) {
@@ -60,32 +63,55 @@ public class ClientTransportObserver implements TransportObserver {
@Override
public void onMetadata(Metadata metadata, boolean endStream) {
- if (!headerSent) {
- final Http2Headers headers = new DefaultHttp2Headers(true)
- .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
- .authority(metadata.get(TripleHeaderEnum.AUTHORITY_KEY.getHeader()))
- .scheme(SCHEME)
- .method(HttpMethod.POST.asciiName());
- metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
- headerSent = true;
- streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
- .addListener(future -> {
- if (!future.isSuccess()) {
- promise.tryFailure(future.cause());
- }
- });
+ if (headerSent) {
+ return;
}
+ if (resetSent) {
+ return;
+ }
+ final Http2Headers headers = new DefaultHttp2Headers(true)
+ .path(metadata.get(TripleHeaderEnum.PATH_KEY.getHeader()))
+ .authority(metadata.get(TripleHeaderEnum.AUTHORITY_KEY.getHeader()))
+ .scheme(SCHEME)
+ .method(HttpMethod.POST.asciiName());
+ metadata.forEach(e -> headers.set(e.getKey(), e.getValue()));
+ headerSent = true;
+ streamChannel.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ promise.tryFailure(future.cause());
+ }
+ });
+
+ }
+
+ @Override
+ public void onReset(Http2Error http2Error) {
+ resetSent = true;
+ streamChannel.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ promise.trySuccess();
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ });
}
@Override
public void onData(byte[] data, boolean endStream) {
+ if (resetSent) {
+ return;
+ }
ByteBuf buf = ctx.alloc().buffer();
buf.writeByte(0);
buf.writeInt(data.length);
buf.writeBytes(data);
streamChannel.writeAndFlush(new DefaultHttp2DataFrame(buf, endStream))
.addListener(future -> {
- if (!future.isSuccess()) {
+ if (future.isSuccess()) {
+ promise.trySuccess();
+ } else {
promise.tryFailure(future.cause());
}
});
@@ -93,16 +119,20 @@ public class ClientTransportObserver implements TransportObserver {
@Override
public void onComplete() {
- if (!endStreamSent) {
- endStreamSent = true;
- streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
- .addListener(future -> {
- if (future.isSuccess()) {
- promise.trySuccess();
- } else {
- promise.tryFailure(future.cause());
- }
- });
+ if (resetSent) {
+ return;
}
+ if (endStreamSent) {
+ return;
+ }
+ endStreamSent = true;
+ streamChannel.writeAndFlush(new DefaultHttp2DataFrame(true))
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ promise.trySuccess();
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ });
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
index 27e568c..e7901ed 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerStream.java
@@ -103,8 +103,7 @@ public class ServerStream extends AbstractServerStream implements Stream {
getStreamSubscriber().onNext(arguments[0]);
}
}
- } catch (
- Throwable t) {
+ } catch (Throwable t) {
transportError(GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
.withDescription("Deserialize request failed")
.withCause(t));
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
index a15e5e0..fdb5085 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServerTransportObserver.java
@@ -17,19 +17,27 @@
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
+import io.netty.handler.codec.http2.Http2Error;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
public class ServerTransportObserver implements TransportObserver {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ServerTransportObserver.class);
+
private final ChannelHandlerContext ctx;
private final ChannelPromise promise;
private boolean headerSent = false;
+ private boolean resetSent = false;
public ServerTransportObserver(ChannelHandlerContext ctx, ChannelPromise promise) {
this.ctx = ctx;
@@ -38,6 +46,9 @@ public class ServerTransportObserver implements TransportObserver {
@Override
public void onMetadata(Metadata metadata, boolean endStream) {
+ if (resetSent) {
+ return;
+ }
final DefaultHttp2Headers headers = new DefaultHttp2Headers(true);
metadata.forEach(e -> {
headers.set(e.getKey(), e.getValue());
@@ -48,19 +59,38 @@ public class ServerTransportObserver implements TransportObserver {
headers.set(TripleHeaderEnum.CONTENT_TYPE_KEY.getHeader(), TripleConstant.CONTENT_PROTO);
}
ctx.writeAndFlush(new DefaultHttp2HeadersFrame(headers, endStream))
- .addListener(future -> {
- if (!future.isSuccess()) {
- promise.tryFailure(future.cause());
- }
- });
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ LOGGER.warn("write header error", future.cause());
+ }
+ });
+ }
+
+ @Override
+ public void onReset(Http2Error http2Error) {
+ resetSent = true;
+ ctx.writeAndFlush(new DefaultHttp2ResetFrame(http2Error))
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ LOGGER.warn("write reset error", future.cause());
+ }
+ });
}
@Override
public void onData(byte[] data, boolean endStream) {
+ if (resetSent) {
+ return;
+ }
ByteBuf buf = ctx.alloc().buffer();
buf.writeByte(0);
buf.writeInt(data.length);
buf.writeBytes(data);
- ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false));
+ ctx.writeAndFlush(new DefaultHttp2DataFrame(buf, false))
+ .addListener(future -> {
+ if (!future.isSuccess()) {
+ LOGGER.warn("write data error", future.cause());
+ }
+ });
}
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
index 64d833b..77e283e 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TransportObserver.java
@@ -17,12 +17,18 @@
package org.apache.dubbo.rpc.protocol.tri;
+import io.netty.handler.codec.http2.Http2Error;
+
public interface TransportObserver {
void onMetadata(Metadata metadata, boolean endStream);
void onData(byte[] data, boolean endStream);
- default void onComplete(){}
+ default void onReset(Http2Error http2Error) {
+ }
+
+ default void onComplete() {
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
index 1fb4bf0..9d1795a 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleClientHandler.java
@@ -28,6 +28,7 @@ import org.apache.dubbo.remoting.exchange.Request;
import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.AppResponse;
+import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ConsumerModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -36,6 +37,7 @@ import org.apache.dubbo.rpc.model.MethodDescriptor;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2SettingsFrame;
import io.netty.util.ReferenceCountUtil;
@@ -88,6 +90,13 @@ public class TripleClientHandler extends ChannelDuplexHandler {
} else {
stream = AbstractClientStream.stream(url);
}
+ final CancellationContext cancellationContext = inv.getCancellationContext();
+ // for client cancel,send rst frame to server
+ cancellationContext.addListener(context -> {
+ stream.asTransportObserver().onReset(Http2Error.CANCEL);;
+ });
+ stream.setCancellationContext(cancellationContext);
+
String ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY);
if (StringUtils.isNotEmpty(ssl)) {
ctx.channel().attr(TripleConstant.SSL_ATTRIBUTE_KEY).set(Boolean.parseBoolean(ssl));
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
index 2d2a4bd..f1a2f74 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2ClientResponseHandler.java
@@ -22,9 +22,11 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http2.Http2DataFrame;
+import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2GoAwayFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.handler.codec.http2.Http2ResetFrame;
import io.netty.handler.codec.http2.Http2StreamFrame;
public final class TripleHttp2ClientResponseHandler extends SimpleChannelInboundHandler<Http2StreamFrame> {
@@ -42,6 +44,8 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
ctx.close();
logger.debug(
"Event triggered, event name is: " + event.name() + ", last stream id is: " + event.lastStreamId());
+ } else if (evt instanceof Http2ResetFrame) {
+ onResetRead(ctx, (Http2ResetFrame) evt);
}
}
@@ -56,6 +60,12 @@ public final class TripleHttp2ClientResponseHandler extends SimpleChannelInbound
}
}
+ private void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame resetFrame) {
+ AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
+ clientStream.cancelByRemote(Http2Error.valueOf(resetFrame.errorCode()));
+ ctx.close();
+ }
+
private void onHeadersRead(ChannelHandlerContext ctx, Http2HeadersFrame msg) {
Http2Headers headers = msg.headers();
AbstractClientStream clientStream = TripleUtil.getClientStream(ctx);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
index 0c0daf9..34ee52f 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2FrameServerHandler.java
@@ -16,19 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.handler.codec.http.HttpHeaderNames;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.HttpUtil;
-import io.netty.handler.codec.http2.Http2DataFrame;
-import io.netty.handler.codec.http2.Http2Frame;
-import io.netty.handler.codec.http2.Http2Headers;
-import io.netty.handler.codec.http2.Http2HeadersFrame;
-import io.netty.util.ReferenceCountUtil;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
@@ -43,11 +30,23 @@ import org.apache.dubbo.rpc.model.ProviderModel;
import org.apache.dubbo.rpc.protocol.tri.GrpcStatus.Code;
import org.apache.dubbo.rpc.service.ServiceDescriptorInternalCache;
-import java.util.List;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http2.Http2DataFrame;
+import io.netty.handler.codec.http2.Http2Error;
+import io.netty.handler.codec.http2.Http2Frame;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.handler.codec.http2.Http2ResetFrame;
+import io.netty.util.ReferenceCountUtil;
-import static org.apache.dubbo.rpc.protocol.tri.GrpcStatus.rpcExceptionCodeToGrpc;
-import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responseErr;
-import static org.apache.dubbo.rpc.protocol.tri.TripleUtil.responsePlainTextError;
+import java.util.List;
public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(TripleHttp2FrameServerHandler.class);
@@ -56,7 +55,7 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
public TripleHttp2FrameServerHandler(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
- PATH_RESOLVER = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
+ this.PATH_RESOLVER = frameworkModel.getExtensionLoader(PathResolver.class).getDefaultExtension();
}
@Override
@@ -74,15 +73,31 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
}
@Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof Http2ResetFrame) {
+ onResetRead(ctx, (Http2ResetFrame) evt);
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+
+ public void onResetRead(ChannelHandlerContext ctx, Http2ResetFrame frame) {
+ Http2Error http2Error = Http2Error.valueOf(frame.errorCode());
+ final AbstractServerStream serverStream = TripleUtil.getServerStream(ctx);
+ serverStream.cancelByRemote(http2Error);
+ ctx.close();
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Exception in processing triple message", cause);
}
if (cause instanceof RpcException) {
- TripleUtil.responseErr(ctx, rpcExceptionCodeToGrpc(((RpcException) cause).getCode()));
+ TripleUtil.responseErr(ctx, GrpcStatus.rpcExceptionCodeToGrpc(((RpcException) cause).getCode()));
} else {
TripleUtil.responseErr(ctx, GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription("Provider's error:\n" + cause.getMessage()));
+ .withDescription("Provider's error:\n" + cause.getMessage()));
}
}
@@ -99,9 +114,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
private Invoker<?> getInvoker(Http2Headers headers, String serviceName) {
final String version = headers.contains(TripleHeaderEnum.SERVICE_VERSION.getHeader()) ? headers.get(
- TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString() : null;
+ TripleHeaderEnum.SERVICE_VERSION.getHeader()).toString() : null;
final String group = headers.contains(TripleHeaderEnum.SERVICE_GROUP.getHeader()) ? headers.get(TripleHeaderEnum.SERVICE_GROUP.getHeader())
- .toString() : null;
+ .toString() : null;
final String key = URL.buildKey(serviceName, group, version);
Invoker<?> invoker = PATH_RESOLVER.resolve(key);
if (invoker == null) {
@@ -114,45 +129,45 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
final Http2Headers headers = msg.headers();
if (!HttpMethod.POST.asciiName().contentEquals(headers.method())) {
- responsePlainTextError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
- GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withDescription(String.format("Method '%s' is not supported", headers.method())));
+ TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.METHOD_NOT_ALLOWED.code(),
+ GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
+ .withDescription(String.format("Method '%s' is not supported", headers.method())));
return;
}
if (headers.path() == null) {
- responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
- GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
+ TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED.code).withDescription("Expected path but is missing"));
return;
}
final String path = headers.path().toString();
if (path.charAt(0) != '/') {
- responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
- GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
- .withDescription(String.format("Expected path to start with /: %s", path)));
+ TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.NOT_FOUND.code(),
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED.code)
+ .withDescription(String.format("Expected path to start with /: %s", path)));
return;
}
final CharSequence contentType = HttpUtil.getMimeType(headers.get(HttpHeaderNames.CONTENT_TYPE));
if (contentType == null) {
- responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
- GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
- .withDescription("Content-Type is missing from the request"));
+ TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+ GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL.code)
+ .withDescription("Content-Type is missing from the request"));
return;
}
final String contentString = contentType.toString();
if (!TripleUtil.supportContentType(contentString)) {
- responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
- GrpcStatus.fromCode(Code.INTERNAL.code)
- .withDescription(String.format("Content-Type '%s' is not supported", contentString)));
+ TripleUtil.responsePlainTextError(ctx, HttpResponseStatus.UNSUPPORTED_MEDIA_TYPE.code(),
+ GrpcStatus.fromCode(Code.INTERNAL.code)
+ .withDescription(String.format("Content-Type '%s' is not supported", contentString)));
return;
}
String[] parts = path.split("/");
if (parts.length != 3) {
- responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Bad path format:" + path));
+ TripleUtil.responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Bad path format:" + path));
return;
}
String serviceName = parts[1];
@@ -161,15 +176,15 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
final Invoker<?> invoker = getInvoker(headers, serviceName);
if (invoker == null) {
- responseErr(ctx,
- GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+ TripleUtil.responseErr(ctx,
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
return;
}
FrameworkServiceRepository repo = frameworkModel.getServiceRepository();
ProviderModel providerModel = repo.lookupExportedService(invoker.getUrl().getServiceKey());
if (providerModel == null || providerModel.getServiceModel() == null) {
- responseErr(ctx,
- GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
+ TripleUtil.responseErr(ctx,
+ GrpcStatus.fromCode(Code.UNIMPLEMENTED).withDescription("Service not found:" + serviceName));
return;
}
@@ -185,8 +200,8 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
} else {
methodDescriptors = providerModel.getServiceModel().getMethods(methodName);
if (CollectionUtils.isEmpty(methodDescriptors)) {
- responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
- .withDescription("Method :" + methodName + " not found of service:" + serviceName));
+ TripleUtil.responseErr(ctx, GrpcStatus.fromCode(Code.UNIMPLEMENTED)
+ .withDescription("Method :" + methodName + " not found of service:" + serviceName));
return;
}
// In most cases there is only one method
@@ -208,9 +223,9 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
}
});
stream.service(providerModel.getServiceModel())
- .invoker(invoker)
- .methodName(methodName)
- .subscribe(new ServerTransportObserver(ctx,promise));
+ .invoker(invoker)
+ .methodName(methodName)
+ .subscribe(new ServerTransportObserver(ctx, promise));
if (methodDescriptor != null) {
stream.method(methodDescriptor);
} else {
@@ -235,4 +250,8 @@ public class TripleHttp2FrameServerHandler extends ChannelDuplexHandler {
return CommonConstants.$INVOKE.equals(methodName) || CommonConstants.$INVOKE_ASYNC.equals(methodName);
}
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ super.write(ctx, msg, promise);
+ }
}
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 79eba43..a3d4c31 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -16,6 +16,14 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.extension.Activate;
+import org.apache.dubbo.remoting.api.Http2WireProtocol;
+import org.apache.dubbo.rpc.model.FrameworkModel;
+import org.apache.dubbo.rpc.model.ScopeModelAware;
+
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -24,13 +32,6 @@ import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.ssl.SslContext;
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.config.Configuration;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.extension.Activate;
-import org.apache.dubbo.remoting.api.Http2WireProtocol;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.ScopeModelAware;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_HEADER_TABLE_SIZE_KEY;
@@ -59,11 +60,11 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
final Http2FrameCodec codec = Http2FrameCodecBuilder.forServer()
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings()
- .headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
- .maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE))
- .initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
- .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 << 14))
- .maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
+ .headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
+ .maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE))
+ .initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
+ .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 << 14))
+ .maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
.frameLogger(SERVER_LOGGER)
.build();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(new TripleServerInitializer(frameworkModel));
@@ -82,12 +83,12 @@ public class TripleHttp2Protocol extends Http2WireProtocol implements ScopeModel
final Http2FrameCodec codec = Http2FrameCodecBuilder.forClient()
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings()
- .headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
- .pushEnabled(config.getBoolean(H2_SETTINGS_ENABLE_PUSH_KEY, false))
- .maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE))
- .initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
- .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 << 14))
- .maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
+ .headerTableSize(config.getInt(H2_SETTINGS_HEADER_TABLE_SIZE_KEY, 4096))
+ .pushEnabled(config.getBoolean(H2_SETTINGS_ENABLE_PUSH_KEY, false))
+ .maxConcurrentStreams(config.getInt(H2_SETTINGS_MAX_CONCURRENT_STREAMS_KEY, Integer.MAX_VALUE))
+ .initialWindowSize(config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, 1 << 20))
+ .maxFrameSize(config.getInt(H2_SETTINGS_MAX_FRAME_SIZE_KEY, 2 << 14))
+ .maxHeaderListSize(config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 8192)))
.frameLogger(CLIENT_LOGGER)
.build();
final Http2MultiplexHandler handler = new Http2MultiplexHandler(new SimpleChannelInboundHandler<Object>() {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
index f049f9d..e0afab2 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.channel.ChannelFuture;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.utils.StringUtils;
@@ -30,6 +29,7 @@ import org.apache.dubbo.remoting.exchange.Response;
import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
+import org.apache.dubbo.rpc.CancellationContext;
import org.apache.dubbo.rpc.FutureContext;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -41,6 +41,8 @@ import org.apache.dubbo.rpc.TimeoutCountDown;
import org.apache.dubbo.rpc.protocol.AbstractInvoker;
import org.apache.dubbo.rpc.support.RpcUtils;
+import io.netty.channel.ChannelFuture;
+
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
@@ -77,6 +79,11 @@ public class TripleInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
+
+ // set cancel context to RpcInvocation to transport to stream
+ final CancellationContext cancellationContext = RpcContext.getCancellationContext();
+ inv.setCancellationContext(cancellationContext);
+
final String methodName = RpcUtils.getMethodName(invocation);
inv.setServiceModel(RpcContext.getServiceContext().getConsumerUrl().getServiceModel());
inv.setAttachment(PATH_KEY, getUrl().getPath());
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
index 7495c80..a3156f3 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleServerConnectionHandler.java
@@ -51,6 +51,11 @@ public class TripleServerConnectionHandler extends Http2ChannelDuplexHandler {
}
@Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ super.userEventTriggered(ctx, evt);
+ }
+
+ @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// this may be change in future follow https://github.com/apache/dubbo/pull/8644
if (TripleUtil.isQuiteException(cause)) {
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
index 415bdc5..2f431b8 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryClientStream.java
@@ -17,9 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri;
-import com.google.protobuf.Any;
-import com.google.rpc.DebugInfo;
-import com.google.rpc.Status;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.exchange.Response;
@@ -27,6 +24,10 @@ import org.apache.dubbo.remoting.exchange.support.DefaultFuture2;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.RpcException;
+import com.google.protobuf.Any;
+import com.google.rpc.DebugInfo;
+import com.google.rpc.Status;
+
import java.util.List;
import java.util.Map;
@@ -66,8 +67,8 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
DefaultFuture2.received(getConnection(), response);
} catch (Exception e) {
final GrpcStatus status = GrpcStatus.fromCode(GrpcStatus.Code.INTERNAL)
- .withCause(e)
- .withDescription("Failed to deserialize response");
+ .withCause(e)
+ .withDescription("Failed to deserialize response");
onError(status);
}
});
@@ -112,7 +113,7 @@ public class UnaryClientStream extends AbstractClientStream implements Stream {
DebugInfo debugInfo = (DebugInfo) classObjectMap.get(DebugInfo.class);
if (debugInfo == null) {
return new RpcException(statusDetail.getCode(),
- statusDetail.getMessage());
+ statusDetail.getMessage());
}
String msg = ExceptionUtils.getStackFrameString(debugInfo.getStackEntriesList());
return new RpcException(statusDetail.getCode(), msg);
diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
index 27005a6..2140463 100644
--- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
+++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/UnaryServerStream.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.rpc.protocol.tri;
-import io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.stream.StreamObserver;
import org.apache.dubbo.remoting.TimeoutException;
@@ -27,6 +26,8 @@ import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.RpcInvocation;
+import io.netty.handler.codec.http.HttpHeaderNames;
+
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;