You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/03/14 21:36:57 UTC

[incubator-pulsar] branch master updated: Add compactedOut flag for batched messages (#1361)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e0c9e7a  Add compactedOut flag for batched messages (#1361)
e0c9e7a is described below

commit e0c9e7ae6adc29ad9ddbf93c6d01e09faafdb645
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Wed Mar 14 22:36:54 2018 +0100

    Add compactedOut flag for batched messages (#1361)
    
    * Add compactedOut flag for batched messages
    
    If batched messages are to be compactable without changing their
    message ids, we need to be able to pad out the batch with empty
    messages, so that the batch index does not change.
    
    For this purpose, this patch introduces a flag, compactedOut. If a
    message in the batch has this flag, it will not be passed to the
    consumer.
---
 .../client/impl/CompactedOutBatchMessageTest.java  | 94 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  8 ++
 .../org/apache/pulsar/common/api/Commands.java     | 36 +++++----
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 57 +++++++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 5 files changed, 182 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
new file mode 100644
index 0000000..af5e804
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/CompactedOutBatchMessageTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import java.io.IOException;
+
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.impl.BatchMessageIdImpl;
+import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
+import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
+import org.apache.pulsar.common.api.proto.PulsarApi.SingleMessageMetadata;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class CompactedOutBatchMessageTest extends ProducerConsumerBase {
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        producerBaseSetup();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testCompactedOutMessages() throws Exception {
+        final String ns1 = "my-property/use/con-ns1";
+        admin.namespaces().createNamespace(ns1);
+        final String topic1 = "persistent://" + ns1 + "/my-topic";
+
+        MessageMetadata metadata = MessageMetadata.newBuilder().setProducerName("foobar")
+            .setSequenceId(1).setPublishTime(1).setNumMessagesInBatch(3).build();
+
+        // build a buffer with 4 messages, first and last compacted out
+        ByteBuf batchBuffer = Unpooled.buffer(1000);
+        Commands.serializeSingleMessageInBatchWithPayload(
+                SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key1"),
+                Unpooled.EMPTY_BUFFER, batchBuffer);
+        Commands.serializeSingleMessageInBatchWithPayload(
+                SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key2"),
+                Unpooled.EMPTY_BUFFER, batchBuffer);
+        Commands.serializeSingleMessageInBatchWithPayload(
+                SingleMessageMetadata.newBuilder().setCompactedOut(false).setPartitionKey("key3"),
+                Unpooled.EMPTY_BUFFER, batchBuffer);
+        Commands.serializeSingleMessageInBatchWithPayload(
+                SingleMessageMetadata.newBuilder().setCompactedOut(true).setPartitionKey("key4"),
+                Unpooled.EMPTY_BUFFER, batchBuffer);
+
+        try (ConsumerImpl<byte[]> consumer
+             = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topic1)
+                .subscriptionName("my-subscriber-name").subscribe()) {
+            // shove it in the sideways
+            consumer.receiveIndividualMessagesFromBatch(metadata, batchBuffer,
+                                                        MessageIdData.newBuilder().setLedgerId(1234)
+                                                        .setEntryId(567).build(), consumer.cnx());
+            Message m = consumer.receive();
+            assertEquals(((BatchMessageIdImpl)m.getMessageId()).getLedgerId(), 1234);
+            assertEquals(((BatchMessageIdImpl)m.getMessageId()).getEntryId(), 567);
+            assertEquals(((BatchMessageIdImpl)m.getMessageId()).getBatchIndex(), 2);
+            assertEquals(m.getKey(), "key3");
+
+            assertEquals(consumer.numMessagesInQueue(), 0);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a76529d..d631fd6 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -957,6 +957,14 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                     ++skippedMessages;
                     continue;
                 }
+                if (singleMessageMetadataBuilder.getCompactedOut()) {
+                    // message has been compacted out, so don't send to the user
+                    singleMessagePayload.release();
+                    singleMessageMetadataBuilder.recycle();
+
+                    ++skippedMessages;
+                    continue;
+                }
 
                 BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
                         messageId.getEntryId(), getPartitionIndex(), i);
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 093f944..59d6780 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -833,24 +833,12 @@ public class Commands {
         return builder.getSequenceId();
     }
 
-    public static ByteBuf serializeSingleMessageInBatchWithPayload(PulsarApi.MessageMetadata.Builder msgBuilder,
+    public static ByteBuf serializeSingleMessageInBatchWithPayload(
+            PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder,
             ByteBuf payload, ByteBuf batchBuffer) {
-
-        // build single message meta-data
-        PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
-                .newBuilder();
-        if (msgBuilder.hasPartitionKey()) {
-            singleMessageMetadataBuilder = singleMessageMetadataBuilder.setPartitionKey(msgBuilder.getPartitionKey());
-        }
-        if (!msgBuilder.getPropertiesList().isEmpty()) {
-            singleMessageMetadataBuilder = singleMessageMetadataBuilder
-                    .addAllProperties(msgBuilder.getPropertiesList());
-        }
         int payLoadSize = payload.readableBytes();
         PulsarApi.SingleMessageMetadata singleMessageMetadata = singleMessageMetadataBuilder.setPayloadSize(payLoadSize)
                 .build();
-        singleMessageMetadataBuilder.recycle();
-
         // serialize meta-data size, meta-data and payload for single message in batch
         int singleMsgMetadataSize = singleMessageMetadata.getSerializedSize();
         try {
@@ -865,6 +853,26 @@ public class Commands {
         return batchBuffer.writeBytes(payload);
     }
 
+    public static ByteBuf serializeSingleMessageInBatchWithPayload(PulsarApi.MessageMetadata.Builder msgBuilder,
+            ByteBuf payload, ByteBuf batchBuffer) {
+
+        // build single message meta-data
+        PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder = PulsarApi.SingleMessageMetadata
+                .newBuilder();
+        if (msgBuilder.hasPartitionKey()) {
+            singleMessageMetadataBuilder = singleMessageMetadataBuilder.setPartitionKey(msgBuilder.getPartitionKey());
+        }
+        if (!msgBuilder.getPropertiesList().isEmpty()) {
+            singleMessageMetadataBuilder = singleMessageMetadataBuilder
+                    .addAllProperties(msgBuilder.getPropertiesList());
+        }
+        try {
+            return serializeSingleMessageInBatchWithPayload(singleMessageMetadataBuilder, payload, batchBuffer);
+        } finally {
+            singleMessageMetadataBuilder.recycle();
+        }
+    }
+
     public static ByteBuf deSerializeSingleMessageInBatch(ByteBuf uncompressedPayload,
             PulsarApi.SingleMessageMetadata.Builder singleMessageMetadataBuilder, int index, int batchSize)
             throws IOException {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 0a362b3..4776ee1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -4623,6 +4623,10 @@ public final class PulsarApi {
     // required int32 payload_size = 3;
     boolean hasPayloadSize();
     int getPayloadSize();
+    
+    // optional bool compacted_out = 4 [default = false];
+    boolean hasCompactedOut();
+    boolean getCompactedOut();
   }
   public static final class SingleMessageMetadata extends
       com.google.protobuf.GeneratedMessageLite
@@ -4724,10 +4728,21 @@ public final class PulsarApi {
       return payloadSize_;
     }
     
+    // optional bool compacted_out = 4 [default = false];
+    public static final int COMPACTED_OUT_FIELD_NUMBER = 4;
+    private boolean compactedOut_;
+    public boolean hasCompactedOut() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public boolean getCompactedOut() {
+      return compactedOut_;
+    }
+    
     private void initFields() {
       properties_ = java.util.Collections.emptyList();
       partitionKey_ = "";
       payloadSize_ = 0;
+      compactedOut_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -4765,6 +4780,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeInt32(3, payloadSize_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBool(4, compactedOut_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -4785,6 +4803,10 @@ public final class PulsarApi {
         size += com.google.protobuf.CodedOutputStream
           .computeInt32Size(3, payloadSize_);
       }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(4, compactedOut_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -4904,6 +4926,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000002);
         payloadSize_ = 0;
         bitField0_ = (bitField0_ & ~0x00000004);
+        compactedOut_ = false;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -4950,6 +4974,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000002;
         }
         result.payloadSize_ = payloadSize_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.compactedOut_ = compactedOut_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -4972,6 +5000,9 @@ public final class PulsarApi {
         if (other.hasPayloadSize()) {
           setPayloadSize(other.getPayloadSize());
         }
+        if (other.hasCompactedOut()) {
+          setCompactedOut(other.getCompactedOut());
+        }
         return this;
       }
       
@@ -5027,6 +5058,11 @@ public final class PulsarApi {
               payloadSize_ = input.readInt32();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              compactedOut_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -5179,6 +5215,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool compacted_out = 4 [default = false];
+      private boolean compactedOut_ ;
+      public boolean hasCompactedOut() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public boolean getCompactedOut() {
+        return compactedOut_;
+      }
+      public Builder setCompactedOut(boolean value) {
+        bitField0_ |= 0x00000008;
+        compactedOut_ = value;
+        
+        return this;
+      }
+      public Builder clearCompactedOut() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        compactedOut_ = false;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.SingleMessageMetadata)
     }
     
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 00f840b..0c0a501 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -104,6 +104,7 @@ message SingleMessageMetadata {
 	repeated KeyValue properties    = 1;
 	optional string partition_key   = 2;
 	required int32 payload_size	= 3;
+        optional bool compacted_out     = 4 [default = false];
 }
 
 enum ServerError {

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.