You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/14 16:16:40 UTC

[GitHub] [pulsar] BewareMyPower opened a new pull request, #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

BewareMyPower opened a new pull request, #16061:
URL: https://github.com/apache/pulsar/pull/16061

   ### Motivation
   
   This is the first part to support chunking with Shared subscription.
   Instead of adopting the solutions from the original proposal, this PR
   uses a more simple way:
   1. Skip dispatching the incomplete chunks and add them to the replay.
   2. If a consumer's permits cannot receive a whole chunked message, skip
      this consumer.
   
   ### Modifications
   
   Add an `EntryAndMetadataList` class to hold a list of entries and the
   associated metadata instances, as well as the following methods:
   - `sortChunks`: Sort the entries so that all chunks that belong to
     the same message will be distributed consistently. All incomplete
     chunks will be put at the end after the "watermark" index.
   - `containIncompleteChunks`: Detect if the range  `[start, end]` has
     incomplete chunks.
   
   Then, in the `PersistentDispatcherMultipleConsumers#sendMessages`,
   leverage the `EntryAndMetadataList` class to implement the logic
   described in the previous section.
   
   Finally, cancel the limit in `ConsumerImpl` to process chunks for Shared
   subscriptions.
   
   ### Verifying this change
   
   `MessageChunkingTest#testChunkMessagesWithSharedSubscriptions` is added.
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [x] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#issuecomment-1158567970

   Close this PR. I'll do it in another PR.
   
   The main problem is this PR's solution will add all incomplete chunks into the replay queue (`redeliveryMessages`). However, the `readMoreEntries` method either replays the positions from the replay queue or reads new entries. It's why I call `readMoreEntries` before adding messages to the replay queue. But there is still a chance when multiple chunked messages were sent.
   
   For example, there are 2 chunked messages M0 and M1, both have 3 chunks.
   1. Read M0-C0, M0-C1, M0-C2, M1-C0.
   2. M0 will be dispatched to a proper consumer, M1-C0 will be cached.
   
   Then there are two possibilities:
   - [OK] Read M1-C1 and M1-C2, since they are incomplete chunks, they will be added to the replay queue. Next time `readMoreEntries` will replay these 3 chunks and send them to the consumer.
   - [Failed] M1-C0 is replayed by `readMoreEntries` before reading M1-C1 and M1-C2, then M1-C0 will be added to the replay queue again. The previous steps will be repeated forever.
   
   Another flaw is it cannot handle the limitations of the shared dispatcher, like `dispatcherMaxReadSizeBytes` and `dispatcherMaxReadBatchSize`. If the total chunk message size was greater than the `dispatcherMaxReadSizeBytes`, there would be no chance to gather all chunks in the `readEntriesComplete` method and the incomplete chunks would be always replayed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#discussion_r897222178


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadataList.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@Getter
+public class EntryAndMetadataList {
+
+    private List<Entry> entries;
+    private List<MessageMetadata> metadataList;
+    @Getter
+    private int watermark;
+
+    public EntryAndMetadataList(final List<Entry> entries, final String subscription) {
+        this(entries, entries.stream()
+                .map(e -> Commands.peekAndCopyMessageMetadata(e.getDataBuffer(), subscription, -1))
+                .collect(Collectors.toList()));
+    }
+
+    @VisibleForTesting
+    EntryAndMetadataList(final List<Entry> entries, final List<MessageMetadata> metadataList) {
+        this.entries = entries;
+        this.metadataList = metadataList;
+        this.watermark = entries.size();
+    }
+
+    public int size() {
+        return entries.size();
+    }
+
+    /**
+     * it will sort the entries to make all chunks of the same message consistent in distribution.
+     *
+     * For example, if the original entries are:
+     *   M0, M1-C0, M2, M3, M1-C1, M4
+     * where M1 is a chunked message that has 2 chunks (C0 and C1).
+     * We should sort them to:
+     *   M0, M2, M3, M1-C0, M1-C1, M4
+     */
+    public void sortChunks() {
+        boolean hasChunks = metadataList.stream().anyMatch(MessageMetadata::hasUuid);

Review Comment:
   we already scan the list in the constructor, maybe we can do this check in the constructor and we will save one full scan



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower closed pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

Posted by GitBox <gi...@apache.org>.
BewareMyPower closed pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
URL: https://github.com/apache/pulsar/pull/16061


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#discussion_r897458337


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadataList.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@Getter
+public class EntryAndMetadataList {
+
+    private List<Entry> entries;
+    private List<MessageMetadata> metadataList;
+    @Getter
+    private int watermark;
+
+    public EntryAndMetadataList(final List<Entry> entries, final String subscription) {
+        this(entries, entries.stream()
+                .map(e -> Commands.peekAndCopyMessageMetadata(e.getDataBuffer(), subscription, -1))
+                .collect(Collectors.toList()));
+    }
+
+    @VisibleForTesting
+    EntryAndMetadataList(final List<Entry> entries, final List<MessageMetadata> metadataList) {
+        this.entries = entries;
+        this.metadataList = metadataList;
+        this.watermark = entries.size();
+    }
+
+    public int size() {
+        return entries.size();
+    }
+
+    /**
+     * it will sort the entries to make all chunks of the same message consistent in distribution.
+     *
+     * For example, if the original entries are:
+     *   M0, M1-C0, M2, M3, M1-C1, M4
+     * where M1 is a chunked message that has 2 chunks (C0 and C1).
+     * We should sort them to:
+     *   M0, M2, M3, M1-C0, M1-C1, M4
+     */
+    public void sortChunks() {
+        boolean hasChunks = metadataList.stream().anyMatch(MessageMetadata::hasUuid);

Review Comment:
   Good suggestion. In addition, when the incomplete chunks were added to the replay queue in `addMessageToReplay`, we can also avoid parsing the metadata again. I'll fix it as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#discussion_r897481026


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadataList.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@Getter
+public class EntryAndMetadataList {
+
+    private List<Entry> entries;

Review Comment:
   This class acts more like a **temporary** reference to the list of entries and metadata. The incomplete chunks are actually stored in the redelivery tracker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#discussion_r897463024


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadataList.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@Getter
+public class EntryAndMetadataList {
+
+    private List<Entry> entries;

Review Comment:
   I am concerned that it will increase the risk of OOM, if we holds all incomplete chunk entries in broker memory, as the chunked messages normally is very large.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#issuecomment-1155425301

   https://lists.apache.org/thread/c7qo8x2fr52s0o0lgs5k53nrhq23r6qs Though I've planned to write a proposal before, I'm still wondering if this PR needs a proposal. The support for chunked messages is mainly implemented on the client-side. This PR looks more like a special process on the shared dispatcher.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#issuecomment-1155928418

   I found a serious problem for this PR, I'll handle it first.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org