You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/21 07:21:17 UTC

[GitHub] [pulsar] codelipenghui opened a new pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

codelipenghui opened a new pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255


   ### Motivation
   
   Fix peek message metadata broker while enable broker entry metadata.
   
   When enabled the broker entry metadata, following error occurs:
   
   ```
   22:09:57.802 [broker-topic-workers-OrderedScheduler-4-0:org.apache.pulsar.common.protocol.Commands@1658] ERROR org.apache.pulsar.common.protocol.Commands - [PersistentSubscription{topic=persistent://public/default/__consumer_offsets-partition-0, name=reader-31a9742e6c}] [-1] Failed to parse message metadata
   java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
   	at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:425) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:415) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1653) ~[pulsar-common-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:82) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:232) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.lambda$readEntriesComplete$1(PersistentDispatcherSingleActiveConsumer.java:178) ~[pulsar-broker-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) [managed-ledger-2.8.0-rc-202101192246.jar:2.8.0-rc-202101192246]
   	at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) [bookkeeper-common-4.12.1.jar:4.12.1]
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_261]
   	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [?:1.8.0_261]
   	at java.util.concurrent.FutureTask.run(FutureTask.java) [?:1.8.0_261]
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_261]
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_261]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_261]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_261]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [netty-common-4.1.51.Final.jar:4.1.51.Final]
   	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_261]
   ```
   
   The root cause is peeking message metadata does not skip the broker entry metadata.
   
   ### Modifications
   
   Skip the broker entry metadata if exists when peek message metadata.
   
   ### Verifying this change
   
   Tests added.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API: (no)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no)
   
   ### Documentation
   
     - Does this pull request introduce a new feature? (no)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#discussion_r561663761



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1660,6 +1660,26 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
         }
     }
 
+    public static final String NONE_KEY = "NONE_KEY";
+    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
+        try {
+            int readerIdx = metadataAndPayload.readerIndex();
+            skipBrokerEntryMetadataIfExist(metadataAndPayload);
+            MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+            metadataAndPayload.readerIndex(readerIdx);
+            byte[] key = NONE_KEY.getBytes();
+            if (metadata.hasOrderingKey()) {
+                return metadata.getOrderingKey();
+            } else if (metadata.hasPartitionKey()) {
+                return metadata.getPartitionKey().getBytes();
+            }
+            return key;

Review comment:
       Change `getBytes()` to `getBytes(StandardCharsets.UTF_8)` to fix spotbugs check. It looks like that `pulsar-broker` module doesn't enable spotbugs check yet so the same code passed the check in `pulsar-broker` but failed in `pulsar-common`.

##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.assertj.core.util.Sets;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for the broker entry metadata.
+ */
+public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
+
+    @DataProvider(name = "subscriptionTypes")
+    public static Object[] subscriptionTypes() {
+        return new Object[] {
+                SubscriptionType.Exclusive,
+                SubscriptionType.Failover,
+                SubscriptionType.Shared,
+                SubscriptionType.Key_Shared
+        };
+    }
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        conf.setBrokerEntryMetadataInterceptors(Sets.newTreeSet(
+                "org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"

Review comment:
       Should we also add `org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor` to test multiple `BrokerEntryMetadataInterceptors`?

##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1660,6 +1661,26 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
         }
     }
 
+    public static final String NONE_KEY = "NONE_KEY";
+    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
+        try {
+            int readerIdx = metadataAndPayload.readerIndex();
+            skipBrokerEntryMetadataIfExist(metadataAndPayload);
+            MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+            metadataAndPayload.readerIndex(readerIdx);
+            byte[] key = NONE_KEY.getBytes();

Review comment:
       I think we should also use `getBytes(StandardCharsets.UTF_8)` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#issuecomment-766276534


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#issuecomment-766282506


   /pulsarbot run-failure-checks
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#issuecomment-766311439


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#issuecomment-766041178


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#issuecomment-765479752


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#issuecomment-765892362


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#discussion_r561671434



##########
File path: pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import lombok.Cleanup;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.assertj.core.util.Sets;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test for the broker entry metadata.
+ */
+public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
+
+    @DataProvider(name = "subscriptionTypes")
+    public static Object[] subscriptionTypes() {
+        return new Object[] {
+                SubscriptionType.Exclusive,
+                SubscriptionType.Failover,
+                SubscriptionType.Shared,
+                SubscriptionType.Key_Shared
+        };
+    }
+
+    @BeforeClass
+    protected void setup() throws Exception {
+        conf.setBrokerEntryMetadataInterceptors(Sets.newTreeSet(
+                "org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor"

Review comment:
       Should we also add `org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor` to test multiple `BrokerEntryMetadataInterceptors`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#discussion_r562052611



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1660,6 +1661,26 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
         }
     }
 
+    public static final String NONE_KEY = "NONE_KEY";
+    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
+        try {
+            int readerIdx = metadataAndPayload.readerIndex();
+            skipBrokerEntryMetadataIfExist(metadataAndPayload);
+            MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+            metadataAndPayload.readerIndex(readerIdx);
+            byte[] key = NONE_KEY.getBytes();

Review comment:
       If there is no key, instead of calling `getBytes()` each time, we should just keep a static ref to the `byte[]` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] merlimat commented on a change in pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
merlimat commented on a change in pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#discussion_r562052611



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1660,6 +1661,26 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
         }
     }
 
+    public static final String NONE_KEY = "NONE_KEY";
+    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
+        try {
+            int readerIdx = metadataAndPayload.readerIndex();
+            skipBrokerEntryMetadataIfExist(metadataAndPayload);
+            MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+            metadataAndPayload.readerIndex(readerIdx);
+            byte[] key = NONE_KEY.getBytes();

Review comment:
       If there is no key, instead of calling `getBytes()` each time, we should just keep a static ref to the `byte[]` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#discussion_r561770149



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1660,6 +1661,26 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
         }
     }
 
+    public static final String NONE_KEY = "NONE_KEY";
+    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
+        try {
+            int readerIdx = metadataAndPayload.readerIndex();
+            skipBrokerEntryMetadataIfExist(metadataAndPayload);
+            MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+            metadataAndPayload.readerIndex(readerIdx);
+            byte[] key = NONE_KEY.getBytes();

Review comment:
       I think we should also use `getBytes(StandardCharsets.UTF_8)` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] BewareMyPower commented on a change in pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on a change in pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255#discussion_r561663761



##########
File path: pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
##########
@@ -1660,6 +1660,26 @@ public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, St
         }
     }
 
+    public static final String NONE_KEY = "NONE_KEY";
+    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
+        try {
+            int readerIdx = metadataAndPayload.readerIndex();
+            skipBrokerEntryMetadataIfExist(metadataAndPayload);
+            MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
+            metadataAndPayload.readerIndex(readerIdx);
+            byte[] key = NONE_KEY.getBytes();
+            if (metadata.hasOrderingKey()) {
+                return metadata.getOrderingKey();
+            } else if (metadata.hasPartitionKey()) {
+                return metadata.getPartitionKey().getBytes();
+            }
+            return key;

Review comment:
       Change `getBytes()` to `getBytes(StandardCharsets.UTF_8)` to fix spotbugs check. It looks like that `pulsar-broker` module doesn't enable spotbugs check yet so the same code passed the check in `pulsar-broker` but failed in `pulsar-common`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #9255: Fix peek message metadata broker while enable broker entry metadata.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #9255:
URL: https://github.com/apache/pulsar/pull/9255


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org