You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/06/14 16:16:40 UTC
[GitHub] [pulsar] BewareMyPower opened a new pull request, #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
BewareMyPower opened a new pull request, #16061:
URL: https://github.com/apache/pulsar/pull/16061
### Motivation
This is the first part to support chunking with Shared subscription.
Instead of adopting the solutions from the original proposal, this PR
uses a more simple way:
1. Skip dispatching the incomplete chunks and add them to the replay.
2. If a consumer's permits cannot receive a whole chunked message, skip
this consumer.
### Modifications
Add an `EntryAndMetadataList` class to hold a list of entries and the
associated metadata instances, as well as the following methods:
- `sortChunks`: Sort the entries so that all chunks that belong to
the same message will be distributed consistently. All incomplete
chunks will be put at the end after the "watermark" index.
- `containIncompleteChunks`: Detect if the range `[start, end]` has
incomplete chunks.
Then, in the `PersistentDispatcherMultipleConsumers#sendMessages`,
leverage the `EntryAndMetadataList` class to implement the logic
described in the previous section.
Finally, cancel the limit in `ConsumerImpl` to process chunks for Shared
subscriptions.
### Verifying this change
`MessageChunkingTest#testChunkMessagesWithSharedSubscriptions` is added.
### Documentation
Check the box below or label this PR directly.
Need to update docs?
- [x] `doc-required`
(Your PR needs to update docs and you will update later)
- [ ] `doc-not-needed`
(Please explain why)
- [ ] `doc`
(Your PR contains doc changes)
- [ ] `doc-complete`
(Docs have been already added)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#issuecomment-1158567970
Close this PR. I'll do it in another PR.
The main problem is this PR's solution will add all incomplete chunks into the replay queue (`redeliveryMessages`). However, the `readMoreEntries` method either replays the positions from the replay queue or reads new entries. It's why I call `readMoreEntries` before adding messages to the replay queue. But there is still a chance when multiple chunked messages were sent.
For example, there are 2 chunked messages M0 and M1, both have 3 chunks.
1. Read M0-C0, M0-C1, M0-C2, M1-C0.
2. M0 will be dispatched to a proper consumer, M1-C0 will be cached.
Then there are two possibilities:
- [OK] Read M1-C1 and M1-C2, since they are incomplete chunks, they will be added to the replay queue. Next time `readMoreEntries` will replay these 3 chunks and send them to the consumer.
- [Failed] M1-C0 is replayed by `readMoreEntries` before reading M1-C1 and M1-C2, then M1-C0 will be added to the replay queue again. The previous steps will be repeated forever.
Another flaw is it cannot handle the limitations of the shared dispatcher, like `dispatcherMaxReadSizeBytes` and `dispatcherMaxReadBatchSize`. If the total chunk message size was greater than the `dispatcherMaxReadSizeBytes`, there would be no chance to gather all chunks in the `readEntriesComplete` method and the incomplete chunks would be always replayed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] eolivelli commented on a diff in pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#discussion_r897222178
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadataList.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@Getter
+public class EntryAndMetadataList {
+
+ private List<Entry> entries;
+ private List<MessageMetadata> metadataList;
+ @Getter
+ private int watermark;
+
+ public EntryAndMetadataList(final List<Entry> entries, final String subscription) {
+ this(entries, entries.stream()
+ .map(e -> Commands.peekAndCopyMessageMetadata(e.getDataBuffer(), subscription, -1))
+ .collect(Collectors.toList()));
+ }
+
+ @VisibleForTesting
+ EntryAndMetadataList(final List<Entry> entries, final List<MessageMetadata> metadataList) {
+ this.entries = entries;
+ this.metadataList = metadataList;
+ this.watermark = entries.size();
+ }
+
+ public int size() {
+ return entries.size();
+ }
+
+ /**
+ * it will sort the entries to make all chunks of the same message consistent in distribution.
+ *
+ * For example, if the original entries are:
+ * M0, M1-C0, M2, M3, M1-C1, M4
+ * where M1 is a chunked message that has 2 chunks (C0 and C1).
+ * We should sort them to:
+ * M0, M2, M3, M1-C0, M1-C1, M4
+ */
+ public void sortChunks() {
+ boolean hasChunks = metadataList.stream().anyMatch(MessageMetadata::hasUuid);
Review Comment:
we already scan the list in the constructor, maybe we can do this check in the constructor and we will save one full scan
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower closed pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
Posted by GitBox <gi...@apache.org>.
BewareMyPower closed pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
URL: https://github.com/apache/pulsar/pull/16061
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#discussion_r897458337
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadataList.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@Getter
+public class EntryAndMetadataList {
+
+ private List<Entry> entries;
+ private List<MessageMetadata> metadataList;
+ @Getter
+ private int watermark;
+
+ public EntryAndMetadataList(final List<Entry> entries, final String subscription) {
+ this(entries, entries.stream()
+ .map(e -> Commands.peekAndCopyMessageMetadata(e.getDataBuffer(), subscription, -1))
+ .collect(Collectors.toList()));
+ }
+
+ @VisibleForTesting
+ EntryAndMetadataList(final List<Entry> entries, final List<MessageMetadata> metadataList) {
+ this.entries = entries;
+ this.metadataList = metadataList;
+ this.watermark = entries.size();
+ }
+
+ public int size() {
+ return entries.size();
+ }
+
+ /**
+ * it will sort the entries to make all chunks of the same message consistent in distribution.
+ *
+ * For example, if the original entries are:
+ * M0, M1-C0, M2, M3, M1-C1, M4
+ * where M1 is a chunked message that has 2 chunks (C0 and C1).
+ * We should sort them to:
+ * M0, M2, M3, M1-C0, M1-C1, M4
+ */
+ public void sortChunks() {
+ boolean hasChunks = metadataList.stream().anyMatch(MessageMetadata::hasUuid);
Review Comment:
Good suggestion. In addition, when the incomplete chunks were added to the replay queue in `addMessageToReplay`, we can also avoid parsing the metadata again. I'll fix it as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#discussion_r897481026
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadataList.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@Getter
+public class EntryAndMetadataList {
+
+ private List<Entry> entries;
Review Comment:
This class acts more like a **temporary** reference to the list of entries and metadata. The incomplete chunks are actually stored in the redelivery tracker.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] Jason918 commented on a diff in pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#discussion_r897463024
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/EntryAndMetadataList.java:
##########
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import lombok.Getter;
+import org.apache.bookkeeper.mledger.Entry;
+import org.apache.pulsar.common.api.proto.MessageMetadata;
+import org.apache.pulsar.common.protocol.Commands;
+
+@Getter
+public class EntryAndMetadataList {
+
+ private List<Entry> entries;
Review Comment:
I am concerned that it will increase the risk of OOM, if we holds all incomplete chunk entries in broker memory, as the chunked messages normally is very large.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#issuecomment-1155425301
https://lists.apache.org/thread/c7qo8x2fr52s0o0lgs5k53nrhq23r6qs Though I've planned to write a proposal before, I'm still wondering if this PR needs a proposal. The support for chunked messages is mainly implemented on the client-side. This PR looks more like a special process on the shared dispatcher.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [pulsar] BewareMyPower commented on pull request #16061: [feature][broker] PIP 37: Support chunking with Shared subscription
Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on PR #16061:
URL: https://github.com/apache/pulsar/pull/16061#issuecomment-1155928418
I found a serious problem for this PR, I'll handle it first.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org