You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/22 16:21:55 UTC

[incubator-pulsar] branch master updated: Utility to rebatch a message (#1391)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 6541720  Utility to rebatch a message (#1391)
6541720 is described below

commit 65417201d0b65058a4a4d81eb5dfe12e6b769b8b
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Thu Mar 22 17:21:53 2018 +0100

    Utility to rebatch a message (#1391)
    
    * Utility to rebatch a message
    
    When compacting a batched message, we need to take the message, break
    it into its constituent submessages, select the submessages we wish to
    keep, and rebuild the batched message. In other words rebatching. To
    maintain the same batched message ids, the submessages which are not
    selected are replaced with dummy messages with the compactedOut flag
    set to true.
    
    This patch contains a utility method to do this rebatching, along with
    another utility to extract the ids and keys from a batch. These will
    be used to add batching support to compaction in a later patch.
    
    * Avoid double release of m1
---
 .../pulsar/client/impl/RawBatchConverter.java      | 110 ++++++++++++++++-----
 .../apache/pulsar/client/impl/RawReaderTest.java   |  96 +++++++++---------
 2 files changed, 137 insertions(+), 69 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index b80f216..fa62982 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -18,14 +18,22 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import org.apache.pulsar.client.api.RawMessage;
+import java.util.Optional;
+import java.util.function.BiPredicate;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
@@ -36,45 +44,97 @@ import org.slf4j.LoggerFactory;
 public class RawBatchConverter {
     private static final Logger log = LoggerFactory.getLogger(RawBatchConverter.class);
 
-    private static MessageMetadata mergeMetadata(MessageMetadata batchMetadata,
-                                                 SingleMessageMetadata.Builder singleMessageMetadata) {
-        // is uncompressed size correct?
-        return batchMetadata.toBuilder()
-            .setNumMessagesInBatch(1)
-            .setUncompressedSize(singleMessageMetadata.getPayloadSize())
-            .addAllProperties(singleMessageMetadata.getPropertiesList())
-            .setPartitionKey(singleMessageMetadata.getPartitionKey()).build();
-    }
-
     public static boolean isBatch(RawMessage msg) {
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
         int batchSize = metadata.getNumMessagesInBatch();
-        return batchSize > 0;
+        return batchSize > 1;
     }
 
-    public static Collection<RawMessage> explodeBatch(RawMessage msg) throws IOException {
-        assert(msg.getMessageIdData().getBatchIndex() == -1);
+    public static List<ImmutablePair<MessageId,String>> extractIdsAndKeys(RawMessage msg)
+            throws IOException {
+        checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
         int batchSize = metadata.getNumMessagesInBatch();
-        List<RawMessage> exploded = new ArrayList<>();
+        metadata.recycle();
+
+        List<ImmutablePair<MessageId,String>> idsAndKeys = new ArrayList<>();
+
         for (int i = 0; i < batchSize; i++) {
             SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata.newBuilder();
             ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload,
                                                                                     singleMessageMetadataBuilder,
                                                                                     0, batchSize);
-
-            // serializeMetadataAndPayload takes ownership of the the payload
-            ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(
-                    Commands.ChecksumType.Crc32c, mergeMetadata(metadata, singleMessageMetadataBuilder),
-                    singleMessagePayload);
-            exploded.add(new RawMessageImpl(msg.getMessageIdData().toBuilder().setBatchIndex(i).build(),
-                                            metadataAndPayload));
-            metadataAndPayload.release();
+            MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
+                                                  msg.getMessageIdData().getEntryId(),
+                                                  msg.getMessageIdData().getPartition(),
+                                                  i);
+            if (!singleMessageMetadataBuilder.getCompactedOut()) {
+                idsAndKeys.add(ImmutablePair.of(id, singleMessageMetadataBuilder.getPartitionKey()));
+            }
+            singleMessageMetadataBuilder.recycle();
             singleMessagePayload.release();
         }
-        return exploded;
+        return idsAndKeys;
+    }
+
+    /**
+     * Take a batched message and a filter, and returns a message with the only the submessages
+     * which match the filter. Returns an empty optional if no messages match.
+     *
+     * This takes ownership of the passes in message, and if the returned optional is not empty,
+     * the ownership of that message is returned also.
+     */
+    public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+                                                      BiPredicate<String, MessageId> filter)
+            throws IOException {
+        checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
+
+        ByteBuf payload = msg.getHeadersAndPayload();
+        MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+        ByteBuf batchBuffer = PooledByteBufAllocator.DEFAULT.buffer(payload.capacity());
+        try {
+            int batchSize = metadata.getNumMessagesInBatch();
+            int messagesRetained = 0;
+
+            SingleMessageMetadata.Builder emptyMetadataBuilder = SingleMessageMetadata.newBuilder().setCompactedOut(true);
+            for (int i = 0; i < batchSize; i++) {
+                SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata.newBuilder();
+                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload,
+                                                                                        singleMessageMetadataBuilder,
+                                                                                        0, batchSize);
+                String key = singleMessageMetadataBuilder.getPartitionKey();
+                MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
+                                                      msg.getMessageIdData().getEntryId(),
+                                                      msg.getMessageIdData().getPartition(),
+                                                      i);
+                if (filter.test(key, id)) {
+                    messagesRetained++;
+                    Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
+                                                                      singleMessagePayload, batchBuffer);
+                } else {
+                    Commands.serializeSingleMessageInBatchWithPayload(emptyMetadataBuilder,
+                                                                      Unpooled.EMPTY_BUFFER, batchBuffer);
+                }
+                singleMessageMetadataBuilder.recycle();
+                singleMessagePayload.release();
+            }
+            emptyMetadataBuilder.recycle();
+
+            if (messagesRetained > 0) {
+                ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
+                                                                                  metadata, batchBuffer);
+                return Optional.of(new RawMessageImpl(msg.getMessageIdData(),
+                                                      metadataAndPayload));
+            } else {
+                return Optional.empty();
+            }
+        } finally {
+            batchBuffer.release();
+            metadata.recycle();
+            msg.close();
+        }
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index b63d9c4..e061e3e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -28,9 +28,9 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
 
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -74,12 +74,10 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         super.internalCleanup();
     }
 
-    private Set<String> publishMessagesBase(String topic, int count, boolean batching) throws Exception {
+    private Set<String> publishMessages(String topic, int count) throws Exception {
         Set<String> keys = new HashSet<>();
 
-        try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(count)
-                .enableBatching(batching).batchingMaxMessages(BATCH_MAX_MESSAGES)
-                .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.DAYS).create()) {
+        try (Producer<byte[]> producer = pulsarClient.newProducer().maxPendingMessages(count).topic(topic).create()) {
             Future<?> lastFuture = null;
             for (int i = 0; i < count; i++) {
                 String key = "key"+i;
@@ -94,14 +92,6 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
         return keys;
     }
 
-    private Set<String> publishMessages(String topic, int count) throws Exception {
-        return publishMessagesBase(topic, count, false);
-    }
-
-    private Set<String> publishMessagesInBatches(String topic, int count) throws Exception {
-        return publishMessagesBase(topic, count, true);
-    }
-
     public static String extractKey(RawMessage m) throws Exception {
         ByteBuf headersAndPayload = m.getHeadersAndPayload();
         MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
@@ -239,45 +229,63 @@ public class RawReaderTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testBatching() throws Exception {
-        int numMessages = BATCH_MAX_MESSAGES * 5;
+    public void testBatchingExtractKeysAndIds() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-raw-topic";
 
-        Set<String> keys = publishMessagesInBatches(topic, numMessages);
+        try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                .enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1").setContent("my-content-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2").setContent("my-content-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key3").setContent("my-content-3".getBytes()).build()).get();
+        }
 
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
+        try (RawMessage m = reader.readNextAsync().get()) {
+            List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m);
 
-        Consumer<RawMessage> consumer = new Consumer<RawMessage>() {
-                BatchMessageIdImpl lastId = new BatchMessageIdImpl(-1, -1, -1, -1);
+            Assert.assertEquals(idsAndKeys.size(), 3);
 
-                @Override
-                public void accept(RawMessage m) {
-                    try {
-                        Assert.assertTrue(keys.remove(extractKey(m)));
-                        Assert.assertTrue(m.getMessageId() instanceof BatchMessageIdImpl);
-                        BatchMessageIdImpl id = (BatchMessageIdImpl)m.getMessageId();
+            // assert message ids are in correct order
+            Assert.assertTrue(idsAndKeys.get(0).getLeft().compareTo(idsAndKeys.get(1).getLeft()) < 0);
+            Assert.assertTrue(idsAndKeys.get(1).getLeft().compareTo(idsAndKeys.get(2).getLeft()) < 0);
 
-                        // id should be greater than lastId
-                        Assert.assertEquals(id.compareTo(lastId), 1);
-                    } catch (Exception e) {
-                        Assert.fail("Error checking message", e);
-                    }
-                }
-            };
-        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
-        while (true) {
-            try (RawMessage m = reader.readNextAsync().get()) {
-                if (RawBatchConverter.isBatch(m)) {
-                    RawBatchConverter.explodeBatch(m).forEach(consumer);
-                } else {
-                    consumer.accept(m);
-                }
-                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
-                    break;
-                }
-            }
+            // assert keys are as expected
+            Assert.assertEquals(idsAndKeys.get(0).getRight(), "key1");
+            Assert.assertEquals(idsAndKeys.get(1).getRight(), "key2");
+            Assert.assertEquals(idsAndKeys.get(2).getRight(), "key3");
+        } finally {
+            reader.closeAsync().get();
+        }
+    }
+
+    @Test
+    public void testBatchingRebatch() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-raw-topic";
+
+        try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                .enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1").setContent("my-content-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2").setContent("my-content-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key3").setContent("my-content-3".getBytes()).build()).get();
+        }
+
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
+        try {
+            RawMessage m1 = reader.readNextAsync().get();
+            RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
+            List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
+            Assert.assertEquals(idsAndKeys.size(), 1);
+            Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
+            m2.close();
+        } finally {
+            reader.closeAsync().get();
         }
-        Assert.assertTrue(keys.isEmpty());
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.