You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2022/10/14 01:03:24 UTC

[GitHub] [dubbo] asa3311 opened a new pull request, #10748: triple flowcontrol

asa3311 opened a new pull request, #10748:
URL: https://github.com/apache/dubbo/pull/10748

   triple flowcontrol sub 3.2 branch
   
   ## What is the purpose of the change
   
   
   
   ## Brief changelog
   
   
   ## Verifying this change
   
   
   <!-- Follow this checklist to help us incorporate your contribution quickly and easily: -->
   
   ## Checklist
   - [x] Make sure there is a [GitHub_issue](https://github.com/apache/dubbo/issues) field for the change (usually before you start working on it). Trivial changes like typos do not require a GitHub issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
   - [ ] Each commit in the pull request should have a meaningful subject line and body.
   - [ ] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
   - [ ] Check if is necessary to patch to Dubbo 3 if you are work on Dubbo 2.7
   - [ ] Write necessary unit-test to verify your logic correction, more mock a little better when cross module dependency exist. If the new feature or significant change is committed, please remember to add sample in [dubbo samples](https://github.com/apache/dubbo-samples) project.
   - [ ] Add some description to [dubbo-website](https://github.com/apache/dubbo-website) project if you are requesting to add a feature.
   - [ ] GitHub Actions works fine on your own branch.
   - [ ] If this contribution is large, please follow the [Software Donation Guide](https://github.com/apache/dubbo/wiki/Software-donation-guide).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] asa3311 commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
asa3311 commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r1000800811


##########
dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ServiceAnnotationPostProcessor.java:
##########
@@ -354,7 +355,12 @@ private void processScannedBeanDefinition(BeanDefinitionHolder beanDefinitionHol
     private Annotation findServiceAnnotation(Class<?> beanClass) {
         return serviceAnnotationTypes
                 .stream()
-                .map(annotationType -> findAnnotation(beanClass, annotationType))
+            .map(annotationType ->

Review Comment:
   Removed



##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java:
##########
@@ -186,6 +186,9 @@ public List<URL> getUrls() {
     @Override
     public void setUrls(List<URL> urls) {
         this.urls = urls;
+        if (!urls.isEmpty()) {

Review Comment:
   Removed



##########
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java:
##########
@@ -99,7 +99,14 @@ protected Result doInvoke(final Invocation invocation) throws Throwable {
         }
         try {
             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
+
             int timeout = calculateTimeout(invocation, methodName);
+            if (timeout <= 0) {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] asa3311 commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
asa3311 commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r1000801111


##########
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java:
##########
@@ -108,7 +115,7 @@ protected Result doInvoke(final Invocation invocation) throws Throwable {
             } else {
                 ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                 CompletableFuture<AppResponse> appResponseFuture =
-                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
+                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);

Review Comment:
   Removed



##########
dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java:
##########
@@ -102,7 +102,15 @@ public Result doInvoke(Invocation invocation) throws Throwable {
         if (serverHasToken) {
             invocation.setAttachment(Constants.TOKEN_KEY, serverURL.getParameter(Constants.TOKEN_KEY));
         }
-        invocation.setAttachment(TIMEOUT_KEY, calculateTimeout(invocation, invocation.getMethodName()));
+
+        int timeout = calculateTimeout(invocation, invocation.getMethodName());

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] asa3311 commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
asa3311 commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r1000801836


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java:
##########
@@ -192,8 +192,15 @@ StreamObserver<Object> streamCall(ClientCall call,
     AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation invocation,
         ClientCall call) {
         ExecutorService callbackExecutor = getCallbackExecutor(getUrl(), invocation);
+
         int timeout = calculateTimeout(invocation, invocation.getMethodName());
+        if (timeout <= 0) {

Review Comment:
   Removed



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java:
##########
@@ -48,6 +48,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] asa3311 commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
asa3311 commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r997861986


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = autoRefillConnectionWindow ?
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.AutoRefillState(connection.connectionStream(), initialWindowSize) :
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(connection.connectionStream(), initialWindowSize);
+        connection.connectionStream().setProperty(stateKey, connectionState);
+
+        // Register for notification of new streams.
+        connection.addListener(new Http2ConnectionAdapter() {
+            @Override
+            public void onStreamAdded(Http2Stream stream) {
+                // Unconditionally used the reduced flow control state because it requires no object allocation
+                // and the DefaultFlowState will be allocated in onStreamActive.
+                stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+            }
+
+            @Override
+            public void onStreamActive(Http2Stream stream) {
+                // Need to be sure the stream's initial window is adjusted for SETTINGS
+                // frames which may have been exchanged while it was in IDLE
+                stream.setProperty(stateKey, new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(stream, initialWindowSize));
+            }
+
+            @Override
+            public void onStreamClosed(Http2Stream stream) {
+                try {
+                    // When a stream is closed, consume any remaining bytes so that they
+                    // are restored to the connection window.
+                    org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+                    int unconsumedBytes = state.unconsumedBytes();
+                    if (ctx != null && unconsumedBytes > 0) {
+                        if (consumeAllBytes(state, unconsumedBytes)) {
+                            // As the user has no real control on when this callback is used we should better
+                            // call flush() if we produced any window update to ensure we not stale.
+                            ctx.flush();
+                        }
+                    }
+                } catch (Http2Exception e) {
+                    PlatformDependent.throwException(e);
+                } finally {
+                    // Unconditionally reduce the amount of memory required for flow control because there is no
+                    // object allocation costs associated with doing so and the stream will not have any more
+                    // local flow control state to keep track of anymore.
+                    stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+                }
+            }
+        });
+    }
+
+    @Override
+    public TriHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
+        this.frameWriter = checkNotNull(frameWriter, "frameWriter");
+        return this;
+    }
+
+    @Override
+    public void channelHandlerContext(ChannelHandlerContext ctx) {
+        this.ctx = checkNotNull(ctx, "ctx");
+    }
+
+    @Override
+    public void initialWindowSize(int newWindowSize) throws Http2Exception {
+        assert ctx == null || ctx.executor().inEventLoop();
+        int delta = newWindowSize - initialWindowSize;
+        initialWindowSize = newWindowSize;
+
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor visitor = new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor(delta);
+        connection.forEachActiveStream(visitor);
+        visitor.throwIfError();
+    }
+
+    @Override
+    public int initialWindowSize() {
+        return initialWindowSize;
+    }
+
+    @Override
+    public int windowSize(Http2Stream stream) {
+        return state(stream).windowSize();
+    }
+
+    @Override
+    public int initialWindowSize(Http2Stream stream) {
+        return state(stream).initialWindowSize();
+    }
+
+    @Override
+    public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
+        assert ctx != null && ctx.executor().inEventLoop();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+        // Just add the delta to the stream-specific initial window size so that the next time the window
+        // expands it will grow to the new initial size.
+        state.incrementInitialStreamWindow(delta);
+        state.writeWindowUpdateIfNeeded();
+    }
+
+    public boolean consumeTriBytes(Http2Stream stream, int numBytes)  throws Http2Exception {

Review Comment:
   change to void



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = autoRefillConnectionWindow ?
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.AutoRefillState(connection.connectionStream(), initialWindowSize) :
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(connection.connectionStream(), initialWindowSize);
+        connection.connectionStream().setProperty(stateKey, connectionState);
+
+        // Register for notification of new streams.
+        connection.addListener(new Http2ConnectionAdapter() {
+            @Override
+            public void onStreamAdded(Http2Stream stream) {
+                // Unconditionally used the reduced flow control state because it requires no object allocation
+                // and the DefaultFlowState will be allocated in onStreamActive.
+                stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+            }
+
+            @Override
+            public void onStreamActive(Http2Stream stream) {
+                // Need to be sure the stream's initial window is adjusted for SETTINGS
+                // frames which may have been exchanged while it was in IDLE
+                stream.setProperty(stateKey, new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(stream, initialWindowSize));
+            }
+
+            @Override
+            public void onStreamClosed(Http2Stream stream) {
+                try {
+                    // When a stream is closed, consume any remaining bytes so that they
+                    // are restored to the connection window.
+                    org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+                    int unconsumedBytes = state.unconsumedBytes();
+                    if (ctx != null && unconsumedBytes > 0) {
+                        if (consumeAllBytes(state, unconsumedBytes)) {
+                            // As the user has no real control on when this callback is used we should better
+                            // call flush() if we produced any window update to ensure we not stale.
+                            ctx.flush();
+                        }
+                    }
+                } catch (Http2Exception e) {
+                    PlatformDependent.throwException(e);
+                } finally {
+                    // Unconditionally reduce the amount of memory required for flow control because there is no
+                    // object allocation costs associated with doing so and the stream will not have any more
+                    // local flow control state to keep track of anymore.
+                    stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+                }
+            }
+        });
+    }
+
+    @Override
+    public TriHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
+        this.frameWriter = checkNotNull(frameWriter, "frameWriter");
+        return this;
+    }
+
+    @Override
+    public void channelHandlerContext(ChannelHandlerContext ctx) {
+        this.ctx = checkNotNull(ctx, "ctx");
+    }
+
+    @Override
+    public void initialWindowSize(int newWindowSize) throws Http2Exception {
+        assert ctx == null || ctx.executor().inEventLoop();
+        int delta = newWindowSize - initialWindowSize;
+        initialWindowSize = newWindowSize;
+
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor visitor = new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor(delta);
+        connection.forEachActiveStream(visitor);
+        visitor.throwIfError();
+    }
+
+    @Override
+    public int initialWindowSize() {
+        return initialWindowSize;
+    }
+
+    @Override
+    public int windowSize(Http2Stream stream) {
+        return state(stream).windowSize();
+    }
+
+    @Override
+    public int initialWindowSize(Http2Stream stream) {
+        return state(stream).initialWindowSize();
+    }
+
+    @Override
+    public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
+        assert ctx != null && ctx.executor().inEventLoop();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+        // Just add the delta to the stream-specific initial window size so that the next time the window
+        // expands it will grow to the new initial size.
+        state.incrementInitialStreamWindow(delta);
+        state.writeWindowUpdateIfNeeded();
+    }
+
+    public boolean consumeTriBytes(Http2Stream stream, int numBytes)  throws Http2Exception {
+        //because triple thread do consume ,so delete ctx.executor().inEventLoop()
+        assert ctx != null ;
+        //     assert ctx != null && ctx.executor().inEventLoop();
+        checkPositiveOrZero(numBytes, "numBytes");
+        if (numBytes == 0) {
+            return false;
+        }
+        // Streams automatically consume all remaining bytes when they are closed, so just ignore
+        // if already closed.
+        if (stream != null && !isClosed(stream)) {
+            if (stream.id() == CONNECTION_STREAM_ID) {
+                throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
+            }
+
+            return consumeAllBytes(state(stream), numBytes);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
+        return false;
+    }
+
+    private boolean consumeAllBytes(org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state, int numBytes) throws Http2Exception {
+        return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes);
+    }
+
+    @Override
+    public int unconsumedBytes(Http2Stream stream) {
+        return state(stream).unconsumedBytes();
+    }
+
+    private static void checkValidRatio(float ratio) {
+        if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
+            throw new IllegalArgumentException("Invalid ratio: " + ratio);
+        }
+    }
+
+    /**
+     * The window update ratio is used to determine when a window update must be sent. If the ratio
+     * of bytes processed since the last update has meet or exceeded this ratio then a window update will
+     * be sent. This is the global window update ratio that will be used for new streams.
+     * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary for new streams.
+     * @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
+     */
+    public void windowUpdateRatio(float ratio) {
+        assert ctx == null || ctx.executor().inEventLoop();
+        checkValidRatio(ratio);
+        windowUpdateRatio = ratio;
+    }
+
+    /**
+     * The window update ratio is used to determine when a window update must be sent. If the ratio
+     * of bytes processed since the last update has meet or exceeded this ratio then a window update will
+     * be sent. This is the global window update ratio that will be used for new streams.
+     */
+    public float windowUpdateRatio() {
+        return windowUpdateRatio;
+    }
+
+    /**
+     * The window update ratio is used to determine when a window update must be sent. If the ratio
+     * of bytes processed since the last update has meet or exceeded this ratio then a window update will
+     * be sent. This window update ratio will only be applied to {@code streamId}.
+     * <p>
+     * Note it is the responsibly of the caller to ensure that the the
+     * initial {@code SETTINGS} frame is sent before this is called. It would
+     * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
+     * was generated by this method before the initial {@code SETTINGS} frame is sent.
+     * @param stream the stream for which {@code ratio} applies to.
+     * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
+     * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
+     */
+    public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
+        assert ctx != null && ctx.executor().inEventLoop();
+        checkValidRatio(ratio);
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+        state.windowUpdateRatio(ratio);
+        state.writeWindowUpdateIfNeeded();
+    }
+
+    /**
+     * The window update ratio is used to determine when a window update must be sent. If the ratio
+     * of bytes processed since the last update has meet or exceeded this ratio then a window update will
+     * be sent. This window update ratio will only be applied to {@code streamId}.
+     * @throws Http2Exception If no stream corresponding to {@code stream} could be found.
+     */
+    public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
+        return state(stream).windowUpdateRatio();
+    }
+
+    @Override
+    public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
+                                           boolean endOfStream) throws Http2Exception {
+        assert ctx != null && ctx.executor().inEventLoop();
+        int dataLength = data.readableBytes() + padding;
+
+        // Apply the connection-level flow control
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = connectionState();
+        connectionState.receiveFlowControlledFrame(dataLength);
+
+        if (stream != null && !isClosed(stream)) {
+            // Apply the stream-level flow control
+            org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+            state.endOfStream(endOfStream);
+            state.receiveFlowControlledFrame(dataLength);
+        } else if (dataLength > 0) {
+            // Immediately consume the bytes for the connection window.
+            connectionState.consumeBytes(dataLength);
+        }
+    }
+
+    private org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState() {
+        return connection.connectionStream().getProperty(stateKey);
+    }
+
+    private org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state(Http2Stream stream) {
+        return stream.getProperty(stateKey);
+    }
+
+    private static boolean isClosed(Http2Stream stream) {
+        return stream.state() == Http2Stream.State.CLOSED;
+    }
+
+    /**
+     * Flow control state that does autorefill of the flow control window when the data is
+     * received.
+     */
+    private final class AutoRefillState extends org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState {
+        AutoRefillState(Http2Stream stream, int initialWindowSize) {
+            super(stream, initialWindowSize);
+        }
+
+        @Override
+        public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
+            super.receiveFlowControlledFrame(dataLength);
+            // Need to call the super to consume the bytes, since this.consumeBytes does nothing.
+            super.consumeBytes(dataLength);
+        }
+
+        @Override
+        public boolean consumeBytes(int numBytes) throws Http2Exception {
+            // Do nothing, since the bytes are already consumed upon receiving the data.
+            return false;
+        }
+    }
+
+    /**
+     * Flow control window state for an individual stream.
+     */
+    private class DefaultState implements org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState {
+        private final Http2Stream stream;
+
+        /**
+         * The actual flow control window that is decremented as soon as {@code DATA} arrives.
+         */
+        private int window;
+
+        /**
+         * A view of {@link #window} that is used to determine when to send {@code WINDOW_UPDATE}
+         * frames. Decrementing this window for received {@code DATA} frames is delayed until the
+         * application has indicated that the data has been fully processed. This prevents sending
+         * a {@code WINDOW_UPDATE} until the number of processed bytes drops below the threshold.
+         */
+        private int processedWindow;
+
+        /**
+         * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
+         * Each stream has their own initial window size.
+         */
+        private int initialStreamWindowSize;
+
+        /**
+         * This is used to determine when {@link #processedWindow} is sufficiently far away from
+         * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
+         * Each stream has their own window update ratio.
+         */
+        private float streamWindowUpdateRatio;
+
+        private int lowerBound;
+        private boolean endOfStream;
+
+        DefaultState(Http2Stream stream, int initialWindowSize) {
+            this.stream = stream;
+            window(initialWindowSize);
+            streamWindowUpdateRatio = windowUpdateRatio;
+        }
+
+        @Override
+        public void window(int initialWindowSize) {
+            assert ctx == null || ctx.executor().inEventLoop();
+            window = processedWindow = initialStreamWindowSize = initialWindowSize;
+        }
+
+        @Override
+        public int windowSize() {
+            return window;
+        }
+
+        @Override
+        public int initialWindowSize() {
+            return initialStreamWindowSize;
+        }
+
+        @Override
+        public void endOfStream(boolean endOfStream) {
+            this.endOfStream = endOfStream;
+        }
+
+        @Override
+        public float windowUpdateRatio() {
+            return streamWindowUpdateRatio;
+        }
+
+        @Override
+        public void windowUpdateRatio(float ratio) {
+            assert ctx == null || ctx.executor().inEventLoop();
+            streamWindowUpdateRatio = ratio;
+        }
+
+        @Override
+        public void incrementInitialStreamWindow(int delta) {
+            // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range.
+            int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
+                max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
+            delta = newValue - initialStreamWindowSize;
+
+            initialStreamWindowSize += delta;
+        }
+
+        @Override
+        public void incrementFlowControlWindows(int delta) throws Http2Exception {
+            if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
+                throw streamError(stream.id(), FLOW_CONTROL_ERROR,
+                    "Flow control window overflowed for stream: %d", stream.id());
+            }
+
+            window += delta;
+            processedWindow += delta;
+            lowerBound = delta < 0 ? delta : 0;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] asa3311 commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
asa3311 commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r1000801429


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,648 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY;
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private Configuration config = ConfigurationUtils.getGlobalConfiguration(
+        ApplicationModel.defaultModel());
+    private int initialWindowSize = config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_SIZE);
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        FlowState connectionState = autoRefillConnectionWindow ?
+                new AutoRefillState(connection.connectionStream(), initialWindowSize) :
+                new DefaultState(connection.connectionStream(), initialWindowSize);
+        connection.connectionStream().setProperty(stateKey, connectionState);
+
+        // Register for notification of new streams.
+        connection.addListener(new Http2ConnectionAdapter() {
+            @Override
+            public void onStreamAdded(Http2Stream stream) {
+                // Unconditionally used the reduced flow control state because it requires no object allocation
+                // and the DefaultFlowState will be allocated in onStreamActive.
+                stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+            }
+
+            @Override
+            public void onStreamActive(Http2Stream stream) {
+                // Need to be sure the stream's initial window is adjusted for SETTINGS
+                // frames which may have been exchanged while it was in IDLE
+                stream.setProperty(stateKey, new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(stream, initialWindowSize));

Review Comment:
   done



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,648 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY;
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private Configuration config = ConfigurationUtils.getGlobalConfiguration(
+        ApplicationModel.defaultModel());
+    private int initialWindowSize = config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_SIZE);
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        FlowState connectionState = autoRefillConnectionWindow ?
+                new AutoRefillState(connection.connectionStream(), initialWindowSize) :
+                new DefaultState(connection.connectionStream(), initialWindowSize);
+        connection.connectionStream().setProperty(stateKey, connectionState);
+
+        // Register for notification of new streams.
+        connection.addListener(new Http2ConnectionAdapter() {
+            @Override
+            public void onStreamAdded(Http2Stream stream) {
+                // Unconditionally used the reduced flow control state because it requires no object allocation
+                // and the DefaultFlowState will be allocated in onStreamActive.
+                stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+            }
+
+            @Override
+            public void onStreamActive(Http2Stream stream) {
+                // Need to be sure the stream's initial window is adjusted for SETTINGS
+                // frames which may have been exchanged while it was in IDLE
+                stream.setProperty(stateKey, new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(stream, initialWindowSize));
+            }
+
+            @Override
+            public void onStreamClosed(Http2Stream stream) {
+                try {
+                    // When a stream is closed, consume any remaining bytes so that they
+                    // are restored to the connection window.
+                    org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+                    int unconsumedBytes = state.unconsumedBytes();
+                    if (ctx != null && unconsumedBytes > 0) {
+                        if (consumeAllBytes(state, unconsumedBytes)) {
+                            // As the user has no real control on when this callback is used we should better
+                            // call flush() if we produced any window update to ensure we not stale.
+                            ctx.flush();
+                        }
+                    }
+                } catch (Http2Exception e) {
+                    PlatformDependent.throwException(e);
+                } finally {
+                    // Unconditionally reduce the amount of memory required for flow control because there is no
+                    // object allocation costs associated with doing so and the stream will not have any more
+                    // local flow control state to keep track of anymore.
+                    stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+                }
+            }
+        });
+    }
+
+    @Override
+    public TriHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
+        this.frameWriter = checkNotNull(frameWriter, "frameWriter");
+        return this;
+    }
+
+    @Override
+    public void channelHandlerContext(ChannelHandlerContext ctx) {
+        this.ctx = checkNotNull(ctx, "ctx");
+    }
+
+    @Override
+    public void initialWindowSize(int newWindowSize) throws Http2Exception {
+        assert ctx == null || ctx.executor().inEventLoop();
+        int delta = newWindowSize - initialWindowSize;
+        initialWindowSize = newWindowSize;
+
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor visitor = new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor(delta);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] guohao commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
guohao commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r997653833


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = autoRefillConnectionWindow ?
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.AutoRefillState(connection.connectionStream(), initialWindowSize) :
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(connection.connectionStream(), initialWindowSize);
+        connection.connectionStream().setProperty(stateKey, connectionState);
+
+        // Register for notification of new streams.
+        connection.addListener(new Http2ConnectionAdapter() {
+            @Override
+            public void onStreamAdded(Http2Stream stream) {
+                // Unconditionally used the reduced flow control state because it requires no object allocation
+                // and the DefaultFlowState will be allocated in onStreamActive.
+                stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+            }
+
+            @Override
+            public void onStreamActive(Http2Stream stream) {
+                // Need to be sure the stream's initial window is adjusted for SETTINGS
+                // frames which may have been exchanged while it was in IDLE
+                stream.setProperty(stateKey, new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(stream, initialWindowSize));
+            }
+
+            @Override
+            public void onStreamClosed(Http2Stream stream) {
+                try {
+                    // When a stream is closed, consume any remaining bytes so that they
+                    // are restored to the connection window.
+                    org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+                    int unconsumedBytes = state.unconsumedBytes();
+                    if (ctx != null && unconsumedBytes > 0) {
+                        if (consumeAllBytes(state, unconsumedBytes)) {
+                            // As the user has no real control on when this callback is used we should better
+                            // call flush() if we produced any window update to ensure we not stale.
+                            ctx.flush();
+                        }
+                    }
+                } catch (Http2Exception e) {
+                    PlatformDependent.throwException(e);
+                } finally {
+                    // Unconditionally reduce the amount of memory required for flow control because there is no
+                    // object allocation costs associated with doing so and the stream will not have any more
+                    // local flow control state to keep track of anymore.
+                    stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+                }
+            }
+        });
+    }
+
+    @Override
+    public TriHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
+        this.frameWriter = checkNotNull(frameWriter, "frameWriter");
+        return this;
+    }
+
+    @Override
+    public void channelHandlerContext(ChannelHandlerContext ctx) {
+        this.ctx = checkNotNull(ctx, "ctx");
+    }
+
+    @Override
+    public void initialWindowSize(int newWindowSize) throws Http2Exception {
+        assert ctx == null || ctx.executor().inEventLoop();
+        int delta = newWindowSize - initialWindowSize;
+        initialWindowSize = newWindowSize;
+
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor visitor = new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor(delta);
+        connection.forEachActiveStream(visitor);
+        visitor.throwIfError();
+    }
+
+    @Override
+    public int initialWindowSize() {
+        return initialWindowSize;
+    }
+
+    @Override
+    public int windowSize(Http2Stream stream) {
+        return state(stream).windowSize();
+    }
+
+    @Override
+    public int initialWindowSize(Http2Stream stream) {
+        return state(stream).initialWindowSize();
+    }
+
+    @Override
+    public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
+        assert ctx != null && ctx.executor().inEventLoop();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+        // Just add the delta to the stream-specific initial window size so that the next time the window
+        // expands it will grow to the new initial size.
+        state.incrementInitialStreamWindow(delta);
+        state.writeWindowUpdateIfNeeded();
+    }
+
+    public boolean consumeTriBytes(Http2Stream stream, int numBytes)  throws Http2Exception {

Review Comment:
   Seems `return value` is unused



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;

Review Comment:
   WindowSize should be set to 
   `config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_INIT_SIZE)
   `
   `org.apache.dubbo.rpc.protocol.tri.TripleHttp2Protocol#DEFAULT_WINDOW_INIT_SIZE`



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = autoRefillConnectionWindow ?
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.AutoRefillState(connection.connectionStream(), initialWindowSize) :

Review Comment:
   Why not enable AutoRefillState by default ?



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = autoRefillConnectionWindow ?
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.AutoRefillState(connection.connectionStream(), initialWindowSize) :
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(connection.connectionStream(), initialWindowSize);
+        connection.connectionStream().setProperty(stateKey, connectionState);
+
+        // Register for notification of new streams.
+        connection.addListener(new Http2ConnectionAdapter() {
+            @Override
+            public void onStreamAdded(Http2Stream stream) {
+                // Unconditionally used the reduced flow control state because it requires no object allocation
+                // and the DefaultFlowState will be allocated in onStreamActive.
+                stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+            }
+
+            @Override
+            public void onStreamActive(Http2Stream stream) {
+                // Need to be sure the stream's initial window is adjusted for SETTINGS
+                // frames which may have been exchanged while it was in IDLE
+                stream.setProperty(stateKey, new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(stream, initialWindowSize));
+            }
+
+            @Override
+            public void onStreamClosed(Http2Stream stream) {
+                try {
+                    // When a stream is closed, consume any remaining bytes so that they
+                    // are restored to the connection window.
+                    org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+                    int unconsumedBytes = state.unconsumedBytes();
+                    if (ctx != null && unconsumedBytes > 0) {
+                        if (consumeAllBytes(state, unconsumedBytes)) {
+                            // As the user has no real control on when this callback is used we should better
+                            // call flush() if we produced any window update to ensure we not stale.
+                            ctx.flush();
+                        }
+                    }
+                } catch (Http2Exception e) {
+                    PlatformDependent.throwException(e);
+                } finally {
+                    // Unconditionally reduce the amount of memory required for flow control because there is no
+                    // object allocation costs associated with doing so and the stream will not have any more
+                    // local flow control state to keep track of anymore.
+                    stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+                }
+            }
+        });
+    }
+
+    @Override
+    public TriHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
+        this.frameWriter = checkNotNull(frameWriter, "frameWriter");
+        return this;
+    }
+
+    @Override
+    public void channelHandlerContext(ChannelHandlerContext ctx) {
+        this.ctx = checkNotNull(ctx, "ctx");
+    }
+
+    @Override
+    public void initialWindowSize(int newWindowSize) throws Http2Exception {
+        assert ctx == null || ctx.executor().inEventLoop();
+        int delta = newWindowSize - initialWindowSize;
+        initialWindowSize = newWindowSize;
+
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor visitor = new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor(delta);
+        connection.forEachActiveStream(visitor);
+        visitor.throwIfError();
+    }
+
+    @Override
+    public int initialWindowSize() {
+        return initialWindowSize;
+    }
+
+    @Override
+    public int windowSize(Http2Stream stream) {
+        return state(stream).windowSize();
+    }
+
+    @Override
+    public int initialWindowSize(Http2Stream stream) {
+        return state(stream).initialWindowSize();
+    }
+
+    @Override
+    public void incrementWindowSize(Http2Stream stream, int delta) throws Http2Exception {
+        assert ctx != null && ctx.executor().inEventLoop();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+        // Just add the delta to the stream-specific initial window size so that the next time the window
+        // expands it will grow to the new initial size.
+        state.incrementInitialStreamWindow(delta);
+        state.writeWindowUpdateIfNeeded();
+    }
+
+    public boolean consumeTriBytes(Http2Stream stream, int numBytes)  throws Http2Exception {
+        //because triple thread do consume ,so delete ctx.executor().inEventLoop()
+        assert ctx != null ;
+        //     assert ctx != null && ctx.executor().inEventLoop();
+        checkPositiveOrZero(numBytes, "numBytes");
+        if (numBytes == 0) {
+            return false;
+        }
+        // Streams automatically consume all remaining bytes when they are closed, so just ignore
+        // if already closed.
+        if (stream != null && !isClosed(stream)) {
+            if (stream.id() == CONNECTION_STREAM_ID) {
+                throw new UnsupportedOperationException("Returning bytes for the connection window is not supported");
+            }
+
+            return consumeAllBytes(state(stream), numBytes);
+        }
+        return false;
+    }
+
+    @Override
+    public boolean consumeBytes(Http2Stream stream, int numBytes) throws Http2Exception {
+        return false;
+    }
+
+    private boolean consumeAllBytes(org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state, int numBytes) throws Http2Exception {
+        return connectionState().consumeBytes(numBytes) | state.consumeBytes(numBytes);
+    }
+
+    @Override
+    public int unconsumedBytes(Http2Stream stream) {
+        return state(stream).unconsumedBytes();
+    }
+
+    private static void checkValidRatio(float ratio) {
+        if (Double.compare(ratio, 0.0) <= 0 || Double.compare(ratio, 1.0) >= 0) {
+            throw new IllegalArgumentException("Invalid ratio: " + ratio);
+        }
+    }
+
+    /**
+     * The window update ratio is used to determine when a window update must be sent. If the ratio
+     * of bytes processed since the last update has meet or exceeded this ratio then a window update will
+     * be sent. This is the global window update ratio that will be used for new streams.
+     * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary for new streams.
+     * @throws IllegalArgumentException If the ratio is out of bounds (0, 1).
+     */
+    public void windowUpdateRatio(float ratio) {
+        assert ctx == null || ctx.executor().inEventLoop();
+        checkValidRatio(ratio);
+        windowUpdateRatio = ratio;
+    }
+
+    /**
+     * The window update ratio is used to determine when a window update must be sent. If the ratio
+     * of bytes processed since the last update has meet or exceeded this ratio then a window update will
+     * be sent. This is the global window update ratio that will be used for new streams.
+     */
+    public float windowUpdateRatio() {
+        return windowUpdateRatio;
+    }
+
+    /**
+     * The window update ratio is used to determine when a window update must be sent. If the ratio
+     * of bytes processed since the last update has meet or exceeded this ratio then a window update will
+     * be sent. This window update ratio will only be applied to {@code streamId}.
+     * <p>
+     * Note it is the responsibly of the caller to ensure that the the
+     * initial {@code SETTINGS} frame is sent before this is called. It would
+     * be considered a {@link Http2Error#PROTOCOL_ERROR} if a {@code WINDOW_UPDATE}
+     * was generated by this method before the initial {@code SETTINGS} frame is sent.
+     * @param stream the stream for which {@code ratio} applies to.
+     * @param ratio the ratio to use when checking if a {@code WINDOW_UPDATE} is determined necessary.
+     * @throws Http2Exception If a protocol-error occurs while generating {@code WINDOW_UPDATE} frames
+     */
+    public void windowUpdateRatio(Http2Stream stream, float ratio) throws Http2Exception {
+        assert ctx != null && ctx.executor().inEventLoop();
+        checkValidRatio(ratio);
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+        state.windowUpdateRatio(ratio);
+        state.writeWindowUpdateIfNeeded();
+    }
+
+    /**
+     * The window update ratio is used to determine when a window update must be sent. If the ratio
+     * of bytes processed since the last update has meet or exceeded this ratio then a window update will
+     * be sent. This window update ratio will only be applied to {@code streamId}.
+     * @throws Http2Exception If no stream corresponding to {@code stream} could be found.
+     */
+    public float windowUpdateRatio(Http2Stream stream) throws Http2Exception {
+        return state(stream).windowUpdateRatio();
+    }
+
+    @Override
+    public void receiveFlowControlledFrame(Http2Stream stream, ByteBuf data, int padding,
+                                           boolean endOfStream) throws Http2Exception {
+        assert ctx != null && ctx.executor().inEventLoop();
+        int dataLength = data.readableBytes() + padding;
+
+        // Apply the connection-level flow control
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = connectionState();
+        connectionState.receiveFlowControlledFrame(dataLength);
+
+        if (stream != null && !isClosed(stream)) {
+            // Apply the stream-level flow control
+            org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+            state.endOfStream(endOfStream);
+            state.receiveFlowControlledFrame(dataLength);
+        } else if (dataLength > 0) {
+            // Immediately consume the bytes for the connection window.
+            connectionState.consumeBytes(dataLength);
+        }
+    }
+
+    private org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState() {
+        return connection.connectionStream().getProperty(stateKey);
+    }
+
+    private org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state(Http2Stream stream) {
+        return stream.getProperty(stateKey);
+    }
+
+    private static boolean isClosed(Http2Stream stream) {
+        return stream.state() == Http2Stream.State.CLOSED;
+    }
+
+    /**
+     * Flow control state that does autorefill of the flow control window when the data is
+     * received.
+     */
+    private final class AutoRefillState extends org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState {
+        AutoRefillState(Http2Stream stream, int initialWindowSize) {
+            super(stream, initialWindowSize);
+        }
+
+        @Override
+        public void receiveFlowControlledFrame(int dataLength) throws Http2Exception {
+            super.receiveFlowControlledFrame(dataLength);
+            // Need to call the super to consume the bytes, since this.consumeBytes does nothing.
+            super.consumeBytes(dataLength);
+        }
+
+        @Override
+        public boolean consumeBytes(int numBytes) throws Http2Exception {
+            // Do nothing, since the bytes are already consumed upon receiving the data.
+            return false;
+        }
+    }
+
+    /**
+     * Flow control window state for an individual stream.
+     */
+    private class DefaultState implements org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState {
+        private final Http2Stream stream;
+
+        /**
+         * The actual flow control window that is decremented as soon as {@code DATA} arrives.
+         */
+        private int window;
+
+        /**
+         * A view of {@link #window} that is used to determine when to send {@code WINDOW_UPDATE}
+         * frames. Decrementing this window for received {@code DATA} frames is delayed until the
+         * application has indicated that the data has been fully processed. This prevents sending
+         * a {@code WINDOW_UPDATE} until the number of processed bytes drops below the threshold.
+         */
+        private int processedWindow;
+
+        /**
+         * This is what is used to determine how many bytes need to be returned relative to {@link #processedWindow}.
+         * Each stream has their own initial window size.
+         */
+        private int initialStreamWindowSize;
+
+        /**
+         * This is used to determine when {@link #processedWindow} is sufficiently far away from
+         * {@link #initialStreamWindowSize} such that a {@code WINDOW_UPDATE} should be sent.
+         * Each stream has their own window update ratio.
+         */
+        private float streamWindowUpdateRatio;
+
+        private int lowerBound;
+        private boolean endOfStream;
+
+        DefaultState(Http2Stream stream, int initialWindowSize) {
+            this.stream = stream;
+            window(initialWindowSize);
+            streamWindowUpdateRatio = windowUpdateRatio;
+        }
+
+        @Override
+        public void window(int initialWindowSize) {
+            assert ctx == null || ctx.executor().inEventLoop();
+            window = processedWindow = initialStreamWindowSize = initialWindowSize;
+        }
+
+        @Override
+        public int windowSize() {
+            return window;
+        }
+
+        @Override
+        public int initialWindowSize() {
+            return initialStreamWindowSize;
+        }
+
+        @Override
+        public void endOfStream(boolean endOfStream) {
+            this.endOfStream = endOfStream;
+        }
+
+        @Override
+        public float windowUpdateRatio() {
+            return streamWindowUpdateRatio;
+        }
+
+        @Override
+        public void windowUpdateRatio(float ratio) {
+            assert ctx == null || ctx.executor().inEventLoop();
+            streamWindowUpdateRatio = ratio;
+        }
+
+        @Override
+        public void incrementInitialStreamWindow(int delta) {
+            // Clip the delta so that the resulting initialStreamWindowSize falls within the allowed range.
+            int newValue = (int) min(MAX_INITIAL_WINDOW_SIZE,
+                max(MIN_INITIAL_WINDOW_SIZE, initialStreamWindowSize + (long) delta));
+            delta = newValue - initialStreamWindowSize;
+
+            initialStreamWindowSize += delta;
+        }
+
+        @Override
+        public void incrementFlowControlWindows(int delta) throws Http2Exception {
+            if (delta > 0 && window > MAX_INITIAL_WINDOW_SIZE - delta) {
+                throw streamError(stream.id(), FLOW_CONTROL_ERROR,
+                    "Flow control window overflowed for stream: %d", stream.id());
+            }
+
+            window += delta;
+            processedWindow += delta;
+            lowerBound = delta < 0 ? delta : 0;

Review Comment:
   
   ```suggestion
               lowerBound = Math.min(delta, 0);
   ```



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = autoRefillConnectionWindow ?
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.AutoRefillState(connection.connectionStream(), initialWindowSize) :

Review Comment:
   Remove unnecessary qualification



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] AlbumenJ merged pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
AlbumenJ merged PR #10748:
URL: https://github.com/apache/dubbo/pull/10748


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] asa3311 commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
asa3311 commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r997861525


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = autoRefillConnectionWindow ?
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.AutoRefillState(connection.connectionStream(), initialWindowSize) :

Review Comment:
   > Why not enable AutoRefillState by default ?
   
   this code from netty



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] asa3311 commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
asa3311 commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r997861038


##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,644 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+
+
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private int initialWindowSize = DEFAULT_WINDOW_SIZE;
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState connectionState = autoRefillConnectionWindow ?
+            new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.AutoRefillState(connection.connectionStream(), initialWindowSize) :

Review Comment:
   > Remove unnecessary qualification
   
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] guohao commented on a diff in pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
guohao commented on code in PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#discussion_r1000075963


##########
dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcServiceContext.java:
##########
@@ -186,6 +186,9 @@ public List<URL> getUrls() {
     @Override
     public void setUrls(List<URL> urls) {
         this.urls = urls;
+        if (!urls.isEmpty()) {

Review Comment:
   seems it is unrelated



##########
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java:
##########
@@ -99,7 +99,14 @@ protected Result doInvoke(final Invocation invocation) throws Throwable {
         }
         try {
             boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
+
             int timeout = calculateTimeout(invocation, methodName);
+            if (timeout <= 0) {

Review Comment:
   seems it is unrelated



##########
dubbo-config/dubbo-config-spring/src/main/java/org/apache/dubbo/config/spring/beans/factory/annotation/ServiceAnnotationPostProcessor.java:
##########
@@ -354,7 +355,12 @@ private void processScannedBeanDefinition(BeanDefinitionHolder beanDefinitionHol
     private Annotation findServiceAnnotation(Class<?> beanClass) {
         return serviceAnnotationTypes
                 .stream()
-                .map(annotationType -> findAnnotation(beanClass, annotationType))
+            .map(annotationType ->

Review Comment:
   seems it is unrelated 



##########
dubbo-rpc/dubbo-rpc-injvm/src/main/java/org/apache/dubbo/rpc/protocol/injvm/InjvmInvoker.java:
##########
@@ -102,7 +102,15 @@ public Result doInvoke(Invocation invocation) throws Throwable {
         if (serverHasToken) {
             invocation.setAttachment(Constants.TOKEN_KEY, serverURL.getParameter(Constants.TOKEN_KEY));
         }
-        invocation.setAttachment(TIMEOUT_KEY, calculateTimeout(invocation, invocation.getMethodName()));
+
+        int timeout = calculateTimeout(invocation, invocation.getMethodName());

Review Comment:
   seems it is unrelated



##########
dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboInvoker.java:
##########
@@ -108,7 +115,7 @@ protected Result doInvoke(final Invocation invocation) throws Throwable {
             } else {
                 ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                 CompletableFuture<AppResponse> appResponseFuture =
-                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
+                    currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);

Review Comment:
   no need to  fmt 



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleInvoker.java:
##########
@@ -192,8 +192,15 @@ StreamObserver<Object> streamCall(ClientCall call,
     AsyncRpcResult invokeUnary(MethodDescriptor methodDescriptor, Invocation invocation,
         ClientCall call) {
         ExecutorService callbackExecutor = getCallbackExecutor(getUrl(), invocation);
+
         int timeout = calculateTimeout(invocation, invocation.getMethodName());
+        if (timeout <= 0) {

Review Comment:
   same question, and DRY



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java:
##########
@@ -48,6 +48,7 @@
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 
+

Review Comment:
   no need to fmt



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,648 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY;
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private Configuration config = ConfigurationUtils.getGlobalConfiguration(
+        ApplicationModel.defaultModel());
+    private int initialWindowSize = config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_SIZE);
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        FlowState connectionState = autoRefillConnectionWindow ?
+                new AutoRefillState(connection.connectionStream(), initialWindowSize) :
+                new DefaultState(connection.connectionStream(), initialWindowSize);
+        connection.connectionStream().setProperty(stateKey, connectionState);
+
+        // Register for notification of new streams.
+        connection.addListener(new Http2ConnectionAdapter() {
+            @Override
+            public void onStreamAdded(Http2Stream stream) {
+                // Unconditionally used the reduced flow control state because it requires no object allocation
+                // and the DefaultFlowState will be allocated in onStreamActive.
+                stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+            }
+
+            @Override
+            public void onStreamActive(Http2Stream stream) {
+                // Need to be sure the stream's initial window is adjusted for SETTINGS
+                // frames which may have been exchanged while it was in IDLE
+                stream.setProperty(stateKey, new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(stream, initialWindowSize));

Review Comment:
   import



##########
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriHttp2LocalFlowController.java:
##########
@@ -0,0 +1,648 @@
+/*
+ * Copyright 2014 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.dubbo.rpc.protocol.tri;
+import static io.netty.handler.codec.http2.Http2CodecUtil.CONNECTION_STREAM_ID;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MAX_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.MIN_INITIAL_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2CodecUtil.DEFAULT_WINDOW_SIZE;
+import static io.netty.handler.codec.http2.Http2Error.FLOW_CONTROL_ERROR;
+import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
+import static io.netty.handler.codec.http2.Http2Exception.connectionError;
+import static io.netty.handler.codec.http2.Http2Exception.streamError;
+import static io.netty.util.internal.ObjectUtil.checkNotNull;
+import static io.netty.util.internal.ObjectUtil.checkPositiveOrZero;
+import static java.lang.Math.max;
+import static java.lang.Math.min;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.Http2StreamVisitor;
+import io.netty.handler.codec.http2.Http2Exception;
+import io.netty.handler.codec.http2.Http2ConnectionAdapter;
+import io.netty.handler.codec.http2.Http2Connection;
+import io.netty.handler.codec.http2.Http2Exception.CompositeStreamException;
+import io.netty.handler.codec.http2.Http2Exception.StreamException;
+import io.netty.handler.codec.http2.Http2FrameWriter;
+import io.netty.handler.codec.http2.Http2LocalFlowController;
+import io.netty.handler.codec.http2.Http2Stream;
+import io.netty.util.internal.PlatformDependent;
+import io.netty.util.internal.UnstableApi;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY;
+/**
+ * This design is learning from {@see io.netty.handler.codec.http2.DefaultHttp2LocalFlowController} which is in Netty.
+ */
+@UnstableApi
+public class TriHttp2LocalFlowController implements Http2LocalFlowController {
+    /**
+     * The default ratio of window size to initial window size below which a {@code WINDOW_UPDATE}
+     * is sent to expand the window.
+     */
+    public static final float DEFAULT_WINDOW_UPDATE_RATIO = 0.5f;
+
+    private final Http2Connection connection;
+    private final Http2Connection.PropertyKey stateKey;
+    private Http2FrameWriter frameWriter;
+    private ChannelHandlerContext ctx;
+    private float windowUpdateRatio;
+    private Configuration config = ConfigurationUtils.getGlobalConfiguration(
+        ApplicationModel.defaultModel());
+    private int initialWindowSize = config.getInt(H2_SETTINGS_INITIAL_WINDOW_SIZE_KEY, DEFAULT_WINDOW_SIZE);
+
+    public TriHttp2LocalFlowController(Http2Connection connection) {
+        this(connection, DEFAULT_WINDOW_UPDATE_RATIO, false);
+    }
+
+    public TriHttp2LocalFlowController(Http2Connection connection,
+                                       float windowUpdateRatio,
+                                       boolean autoRefillConnectionWindow) {
+        this.connection = checkNotNull(connection, "connection");
+        windowUpdateRatio(windowUpdateRatio);
+
+        // Add a flow state for the connection.
+        stateKey = connection.newKey();
+        FlowState connectionState = autoRefillConnectionWindow ?
+                new AutoRefillState(connection.connectionStream(), initialWindowSize) :
+                new DefaultState(connection.connectionStream(), initialWindowSize);
+        connection.connectionStream().setProperty(stateKey, connectionState);
+
+        // Register for notification of new streams.
+        connection.addListener(new Http2ConnectionAdapter() {
+            @Override
+            public void onStreamAdded(Http2Stream stream) {
+                // Unconditionally used the reduced flow control state because it requires no object allocation
+                // and the DefaultFlowState will be allocated in onStreamActive.
+                stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+            }
+
+            @Override
+            public void onStreamActive(Http2Stream stream) {
+                // Need to be sure the stream's initial window is adjusted for SETTINGS
+                // frames which may have been exchanged while it was in IDLE
+                stream.setProperty(stateKey, new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.DefaultState(stream, initialWindowSize));
+            }
+
+            @Override
+            public void onStreamClosed(Http2Stream stream) {
+                try {
+                    // When a stream is closed, consume any remaining bytes so that they
+                    // are restored to the connection window.
+                    org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.FlowState state = state(stream);
+                    int unconsumedBytes = state.unconsumedBytes();
+                    if (ctx != null && unconsumedBytes > 0) {
+                        if (consumeAllBytes(state, unconsumedBytes)) {
+                            // As the user has no real control on when this callback is used we should better
+                            // call flush() if we produced any window update to ensure we not stale.
+                            ctx.flush();
+                        }
+                    }
+                } catch (Http2Exception e) {
+                    PlatformDependent.throwException(e);
+                } finally {
+                    // Unconditionally reduce the amount of memory required for flow control because there is no
+                    // object allocation costs associated with doing so and the stream will not have any more
+                    // local flow control state to keep track of anymore.
+                    stream.setProperty(stateKey, REDUCED_FLOW_STATE);
+                }
+            }
+        });
+    }
+
+    @Override
+    public TriHttp2LocalFlowController frameWriter(Http2FrameWriter frameWriter) {
+        this.frameWriter = checkNotNull(frameWriter, "frameWriter");
+        return this;
+    }
+
+    @Override
+    public void channelHandlerContext(ChannelHandlerContext ctx) {
+        this.ctx = checkNotNull(ctx, "ctx");
+    }
+
+    @Override
+    public void initialWindowSize(int newWindowSize) throws Http2Exception {
+        assert ctx == null || ctx.executor().inEventLoop();
+        int delta = newWindowSize - initialWindowSize;
+        initialWindowSize = newWindowSize;
+
+        org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor visitor = new org.apache.dubbo.rpc.protocol.tri.TriHttp2LocalFlowController.WindowUpdateVisitor(delta);

Review Comment:
   import



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org


[GitHub] [dubbo] codecov-commenter commented on pull request #10748: triple flowcontrol

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #10748:
URL: https://github.com/apache/dubbo/pull/10748#issuecomment-1278366245

   # [Codecov](https://codecov.io/gh/apache/dubbo/pull/10748?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#10748](https://codecov.io/gh/apache/dubbo/pull/10748?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (63af477) into [3.2](https://codecov.io/gh/apache/dubbo/commit/ce9953e98dc65d156f21e18aa6724014908ef7b1?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ce9953e) will **increase** coverage by `0.03%`.
   > The diff coverage is `57.98%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##                3.2   #10748      +/-   ##
   ============================================
   + Coverage     65.31%   65.35%   +0.03%     
     Complexity      393      393              
   ============================================
     Files          1338     1341       +3     
     Lines         57046    57854     +808     
     Branches       8429     8563     +134     
   ============================================
   + Hits          37260    37810     +550     
   - Misses        15838    16033     +195     
   - Partials       3948     4011      +63     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/dubbo/pull/10748?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...bo/rpc/protocol/tri/stream/TripleClientStream.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvc3RyZWFtL1RyaXBsZUNsaWVudFN0cmVhbS5qYXZh) | `53.79% <50.00%> (ø)` | |
   | [...rpc/protocol/tri/TriHttp2RemoteFlowController.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvVHJpSHR0cDJSZW1vdGVGbG93Q29udHJvbGxlci5qYXZh) | `51.79% <51.79%> (ø)` | |
   | [.../rpc/protocol/tri/TriHttp2LocalFlowController.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvVHJpSHR0cDJMb2NhbEZsb3dDb250cm9sbGVyLmphdmE=) | `55.24% <55.24%> (ø)` | |
   | [...dubbo/rpc/protocol/tri/TripleFlowControlFrame.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvVHJpcGxlRmxvd0NvbnRyb2xGcmFtZS5qYXZh) | `61.90% <61.90%> (ø)` | |
   | [.../protocol/tri/call/BiStreamServerCallListener.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvY2FsbC9CaVN0cmVhbVNlcnZlckNhbGxMaXN0ZW5lci5qYXZh) | `70.37% <66.66%> (-4.63%)` | :arrow_down: |
   | [.../protocol/tri/call/AbstractServerCallListener.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvY2FsbC9BYnN0cmFjdFNlcnZlckNhbGxMaXN0ZW5lci5qYXZh) | `63.63% <75.00%> (+1.13%)` | :arrow_up: |
   | [...tocol/tri/call/ServerStreamServerCallListener.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvY2FsbC9TZXJ2ZXJTdHJlYW1TZXJ2ZXJDYWxsTGlzdGVuZXIuamF2YQ==) | `76.47% <80.00%> (+3.74%)` | :arrow_up: |
   | [...pache/dubbo/rpc/protocol/tri/frame/TriDecoder.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvZnJhbWUvVHJpRGVjb2Rlci5qYXZh) | `83.58% <91.66%> (+2.22%)` | :arrow_up: |
   | [...he/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvVHJpcGxlSHR0cDJQcm90b2NvbC5qYXZh) | `92.30% <100.00%> (+0.64%)` | :arrow_up: |
   | [...ubbo/rpc/protocol/tri/call/AbstractServerCall.java](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-ZHViYm8tcnBjL2R1YmJvLXJwYy10cmlwbGUvc3JjL21haW4vamF2YS9vcmcvYXBhY2hlL2R1YmJvL3JwYy9wcm90b2NvbC90cmkvY2FsbC9BYnN0cmFjdFNlcnZlckNhbGwuamF2YQ==) | `51.95% <100.00%> (+0.54%)` | :arrow_up: |
   | ... and [30 more](https://codecov.io/gh/apache/dubbo/pull/10748/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org