You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/11/23 19:23:56 UTC
[2/2] camel git commit: CAMEL-10520 fix the acknowledgment mechanism
for lumberjack protocol v1
CAMEL-10520 fix the acknowledgment mechanism for lumberjack protocol v1
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6145dcd2
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6145dcd2
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6145dcd2
Branch: refs/heads/camel-2.18.x
Commit: 6145dcd25c247606c675efd54e71ecaa939a5772
Parents: 466bcab
Author: xfournet <xa...@gmail.com>
Authored: Wed Nov 23 19:53:12 2016 +0100
Committer: xfournet <xa...@gmail.com>
Committed: Wed Nov 23 20:09:58 2016 +0100
----------------------------------------------------------------------
.../component/lumberjack/io/LumberjackSessionHandler.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/6145dcd2/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
----------------------------------------------------------------------
diff --git a/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java b/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
index 6c16656..3f184ca 100644
--- a/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
+++ b/components/camel-lumberjack/src/main/java/org/apache/camel/component/lumberjack/io/LumberjackSessionHandler.java
@@ -31,9 +31,11 @@ import static org.apache.camel.component.lumberjack.io.LumberjackConstants.VERSI
*/
final class LumberjackSessionHandler {
private static final Logger LOG = LoggerFactory.getLogger(LumberjackSessionHandler.class);
+ private static final int ACK_UNSET = -1;
private volatile int version = -1;
private volatile int windowSize = 1;
+ private volatile int nextAck = ACK_UNSET;
void versionRead(int version) {
if (this.version == -1) {
@@ -50,10 +52,15 @@ final class LumberjackSessionHandler {
void windowSizeRead(int windowSize) {
LOG.debug("Lumberjack window size is {}", windowSize);
this.windowSize = windowSize;
+ nextAck = ACK_UNSET;
}
void notifyMessageProcessed(ChannelHandlerContext ctx, int sequenceNumber) {
- if (sequenceNumber == windowSize) {
+ if (nextAck == ACK_UNSET) {
+ nextAck = sequenceNumber + windowSize - 1;
+ }
+
+ if (sequenceNumber == nextAck) {
LOG.debug("Sequence number is {}. Sending ACK", sequenceNumber);
ByteBuf response = ctx.alloc().heapBuffer(FRAME_ACKNOWLEDGE_LENGTH, FRAME_ACKNOWLEDGE_LENGTH);
response.writeByte(version);