You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/22 16:21:55 UTC

[GitHub] merlimat closed pull request #1391: Utility to rebatch a message

merlimat closed pull request #1391: Utility to rebatch a message
URL: https://github.com/apache/incubator-pulsar/pull/1391
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index b80f21697..fa629827a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -18,14 +18,22 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
-import org.apache.pulsar.client.api.RawMessage;
+import java.util.Optional;
+import java.util.function.BiPredicate;
 
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
@@ -36,45 +44,97 @@
 public class RawBatchConverter {
     private static final Logger log = LoggerFactory.getLogger(RawBatchConverter.class);
 
-    private static MessageMetadata mergeMetadata(MessageMetadata batchMetadata,
-                                                 SingleMessageMetadata.Builder singleMessageMetadata) {
-        // is uncompressed size correct?
-        return batchMetadata.toBuilder()
-            .setNumMessagesInBatch(1)
-            .setUncompressedSize(singleMessageMetadata.getPayloadSize())
-            .addAllProperties(singleMessageMetadata.getPropertiesList())
-            .setPartitionKey(singleMessageMetadata.getPartitionKey()).build();
-    }
-
     public static boolean isBatch(RawMessage msg) {
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
         int batchSize = metadata.getNumMessagesInBatch();
-        return batchSize > 0;
+        return batchSize > 1;
     }
 
-    public static Collection<RawMessage> explodeBatch(RawMessage msg) throws IOException {
-        assert(msg.getMessageIdData().getBatchIndex() == -1);
+    public static List<ImmutablePair<MessageId,String>> extractIdsAndKeys(RawMessage msg)
+            throws IOException {
+        checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
 
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
         int batchSize = metadata.getNumMessagesInBatch();
-        List<RawMessage> exploded = new ArrayList<>();
+        metadata.recycle();
+
+        List<ImmutablePair<MessageId,String>> idsAndKeys = new ArrayList<>();
+
         for (int i = 0; i < batchSize; i++) {
             SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata.newBuilder();
             ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload,
                                                                                     singleMessageMetadataBuilder,
                                                                                     0, batchSize);
-
-            // serializeMetadataAndPayload takes ownership of the the payload
-            ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(
-                    Commands.ChecksumType.Crc32c, mergeMetadata(metadata, singleMessageMetadataBuilder),
-                    singleMessagePayload);
-            exploded.add(new RawMessageImpl(msg.getMessageIdData().toBuilder().setBatchIndex(i).build(),
-                                            metadataAndPayload));
-            metadataAndPayload.release();
+            MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
+                                                  msg.getMessageIdData().getEntryId(),
+                                                  msg.getMessageIdData().getPartition(),
+                                                  i);
+            if (!singleMessageMetadataBuilder.getCompactedOut()) {
+                idsAndKeys.add(ImmutablePair.of(id, singleMessageMetadataBuilder.getPartitionKey()));
+            }
+            singleMessageMetadataBuilder.recycle();
             singleMessagePayload.release();
         }
-        return exploded;
+        return idsAndKeys;
+    }
+
+    /**
+     * Take a batched message and a filter, and returns a message with the only the submessages
+     * which match the filter. Returns an empty optional if no messages match.
+     *
+     * This takes ownership of the passes in message, and if the returned optional is not empty,
+     * the ownership of that message is returned also.
+     */
+    public static Optional<RawMessage> rebatchMessage(RawMessage msg,
+                                                      BiPredicate<String, MessageId> filter)
+            throws IOException {
+        checkArgument(msg.getMessageIdData().getBatchIndex() == -1);
+
+        ByteBuf payload = msg.getHeadersAndPayload();
+        MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+        ByteBuf batchBuffer = PooledByteBufAllocator.DEFAULT.buffer(payload.capacity());
+        try {
+            int batchSize = metadata.getNumMessagesInBatch();
+            int messagesRetained = 0;
+
+            SingleMessageMetadata.Builder emptyMetadataBuilder = SingleMessageMetadata.newBuilder().setCompactedOut(true);
+            for (int i = 0; i < batchSize; i++) {
+                SingleMessageMetadata.Builder singleMessageMetadataBuilder = SingleMessageMetadata.newBuilder();
+                ByteBuf singleMessagePayload = Commands.deSerializeSingleMessageInBatch(payload,
+                                                                                        singleMessageMetadataBuilder,
+                                                                                        0, batchSize);
+                String key = singleMessageMetadataBuilder.getPartitionKey();
+                MessageId id = new BatchMessageIdImpl(msg.getMessageIdData().getLedgerId(),
+                                                      msg.getMessageIdData().getEntryId(),
+                                                      msg.getMessageIdData().getPartition(),
+                                                      i);
+                if (filter.test(key, id)) {
+                    messagesRetained++;
+                    Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder,
+                                                                      singleMessagePayload, batchBuffer);
+                } else {
+                    Commands.serializeSingleMessageInBatchWithPayload(emptyMetadataBuilder,
+                                                                      Unpooled.EMPTY_BUFFER, batchBuffer);
+                }
+                singleMessageMetadataBuilder.recycle();
+                singleMessagePayload.release();
+            }
+            emptyMetadataBuilder.recycle();
+
+            if (messagesRetained > 0) {
+                ByteBuf metadataAndPayload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
+                                                                                  metadata, batchBuffer);
+                return Optional.of(new RawMessageImpl(msg.getMessageIdData(),
+                                                      metadataAndPayload));
+            } else {
+                return Optional.empty();
+            }
+        } finally {
+            batchBuffer.release();
+            metadata.recycle();
+            msg.close();
+        }
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
index b63d9c442..e061e3e49 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/RawReaderTest.java
@@ -28,9 +28,9 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.function.Consumer;
 
 import org.apache.bookkeeper.mledger.ManagedLedger;
+import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.MessageBuilder;
@@ -74,12 +74,10 @@ public void cleanup() throws Exception {
         super.internalCleanup();
     }
 
-    private Set<String> publishMessagesBase(String topic, int count, boolean batching) throws Exception {
+    private Set<String> publishMessages(String topic, int count) throws Exception {
         Set<String> keys = new HashSet<>();
 
-        try (Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(count)
-                .enableBatching(batching).batchingMaxMessages(BATCH_MAX_MESSAGES)
-                .batchingMaxPublishDelay(Long.MAX_VALUE, TimeUnit.DAYS).create()) {
+        try (Producer<byte[]> producer = pulsarClient.newProducer().maxPendingMessages(count).topic(topic).create()) {
             Future<?> lastFuture = null;
             for (int i = 0; i < count; i++) {
                 String key = "key"+i;
@@ -94,14 +92,6 @@ public void cleanup() throws Exception {
         return keys;
     }
 
-    private Set<String> publishMessages(String topic, int count) throws Exception {
-        return publishMessagesBase(topic, count, false);
-    }
-
-    private Set<String> publishMessagesInBatches(String topic, int count) throws Exception {
-        return publishMessagesBase(topic, count, true);
-    }
-
     public static String extractKey(RawMessage m) throws Exception {
         ByteBuf headersAndPayload = m.getHeadersAndPayload();
         MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
@@ -239,45 +229,63 @@ public void testFlowControl() throws Exception {
     }
 
     @Test
-    public void testBatching() throws Exception {
-        int numMessages = BATCH_MAX_MESSAGES * 5;
+    public void testBatchingExtractKeysAndIds() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-raw-topic";
 
-        Set<String> keys = publishMessagesInBatches(topic, numMessages);
+        try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                .enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1").setContent("my-content-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2").setContent("my-content-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key3").setContent("my-content-3".getBytes()).build()).get();
+        }
 
         RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
+        try (RawMessage m = reader.readNextAsync().get()) {
+            List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m);
 
-        Consumer<RawMessage> consumer = new Consumer<RawMessage>() {
-                BatchMessageIdImpl lastId = new BatchMessageIdImpl(-1, -1, -1, -1);
+            Assert.assertEquals(idsAndKeys.size(), 3);
 
-                @Override
-                public void accept(RawMessage m) {
-                    try {
-                        Assert.assertTrue(keys.remove(extractKey(m)));
-                        Assert.assertTrue(m.getMessageId() instanceof BatchMessageIdImpl);
-                        BatchMessageIdImpl id = (BatchMessageIdImpl)m.getMessageId();
+            // assert message ids are in correct order
+            Assert.assertTrue(idsAndKeys.get(0).getLeft().compareTo(idsAndKeys.get(1).getLeft()) < 0);
+            Assert.assertTrue(idsAndKeys.get(1).getLeft().compareTo(idsAndKeys.get(2).getLeft()) < 0);
 
-                        // id should be greater than lastId
-                        Assert.assertEquals(id.compareTo(lastId), 1);
-                    } catch (Exception e) {
-                        Assert.fail("Error checking message", e);
-                    }
-                }
-            };
-        MessageId lastMessageId = reader.getLastMessageIdAsync().get();
-        while (true) {
-            try (RawMessage m = reader.readNextAsync().get()) {
-                if (RawBatchConverter.isBatch(m)) {
-                    RawBatchConverter.explodeBatch(m).forEach(consumer);
-                } else {
-                    consumer.accept(m);
-                }
-                if (lastMessageId.compareTo(m.getMessageId()) == 0) {
-                    break;
-                }
-            }
+            // assert keys are as expected
+            Assert.assertEquals(idsAndKeys.get(0).getRight(), "key1");
+            Assert.assertEquals(idsAndKeys.get(1).getRight(), "key2");
+            Assert.assertEquals(idsAndKeys.get(2).getRight(), "key3");
+        } finally {
+            reader.closeAsync().get();
+        }
+    }
+
+    @Test
+    public void testBatchingRebatch() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-raw-topic";
+
+        try (Producer producer = pulsarClient.newProducer().topic(topic).maxPendingMessages(3)
+                .enableBatching(true).batchingMaxMessages(3).batchingMaxPublishDelay(1, TimeUnit.HOURS).create()) {
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key1").setContent("my-content-1".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key2").setContent("my-content-2".getBytes()).build());
+            producer.sendAsync(MessageBuilder.create()
+                               .setKey("key3").setContent("my-content-3".getBytes()).build()).get();
+        }
+
+        RawReader reader = RawReader.create(pulsarClient, topic, subscription).get();
+        try {
+            RawMessage m1 = reader.readNextAsync().get();
+            RawMessage m2 = RawBatchConverter.rebatchMessage(m1, (key, id) -> key.equals("key2")).get();
+            List<ImmutablePair<MessageId,String>> idsAndKeys = RawBatchConverter.extractIdsAndKeys(m2);
+            Assert.assertEquals(idsAndKeys.size(), 1);
+            Assert.assertEquals(idsAndKeys.get(0).getRight(), "key2");
+            m2.close();
+        } finally {
+            reader.closeAsync().get();
         }
-        Assert.assertTrue(keys.isEmpty());
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services