You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/06 22:54:45 UTC

[pulsar] branch master updated: Allow to configure KeyShared consumer with relaxed ordering (#7188)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bdfc0d  Allow to configure KeyShared consumer with relaxed ordering (#7188)
1bdfc0d is described below

commit 1bdfc0dedfba5507dad57af56a3dc56737195383
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Jun 6 15:54:28 2020 -0700

    Allow to configure KeyShared consumer with relaxed ordering (#7188)
    
    ### Motivation
    
    In certain cases, it is useful to just use key-shared dispatcher in order to have the same key to go to same consumer, although the ordering is not required.
    
    In this case, if we relax the ordering requirement, we can avoid new consumers getting stuck when an existing consumer is going through a prefetched queue of existing messages. This is especially relevant if the processing time is high.
---
 ...istentStickyKeyDispatcherMultipleConsumers.java | 38 +++++++++++++--
 .../service/persistent/PersistentSubscription.java | 34 ++-----------
 ...ntStickyKeyDispatcherMultipleConsumersTest.java |  3 +-
 .../apache/pulsar/client/api/KeySharedPolicy.java  | 21 ++++++++
 .../apache/pulsar/common/api/proto/PulsarApi.java  | 57 ++++++++++++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    | 43 +++++++++-------
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 7 files changed, 146 insertions(+), 51 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 5c757c1..a4a532f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.service.persistent;
 
+import com.google.common.base.Preconditions;
+
 import io.netty.util.concurrent.FastThreadLocal;
 
 import java.util.ArrayList;
@@ -32,19 +34,25 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
+import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
+import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
 import org.apache.pulsar.broker.service.EntryBatchSizes;
+import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
+import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.SendMessageInfo;
 import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
 
+    private final boolean allowOutOfOrderDelivery;
     private final StickyKeyConsumerSelector selector;
 
     private boolean isDispatcherStuckOnReplays = false;
@@ -54,12 +62,32 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
      * This means that, in order to preserve ordering, new consumers can only receive old
      * messages, until the mark-delete position will move past this point.
      */
-    private final Map<Consumer, PositionImpl> recentlyJoinedConsumers = new HashMap<>();
+    private final Map<Consumer, PositionImpl> recentlyJoinedConsumers;
 
     PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
-           Subscription subscription, StickyKeyConsumerSelector selector) {
+            Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
         super(topic, cursor, subscription);
-        this.selector = selector;
+
+        this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery();
+        this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? Collections.emptyMap() : new HashMap<>();
+
+        switch (ksm.getKeySharedMode()) {
+        case AUTO_SPLIT:
+            if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
+                selector = new ConsistentHashingStickyKeyConsumerSelector(
+                        conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
+            } else {
+                selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
+            }
+            break;
+
+        case STICKY:
+            this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
+            break;
+
+        default:
+            throw new IllegalArgumentException("Invalid key-shared mode: " + ksm.getKeySharedMode());
+        }
     }
 
     @Override
@@ -69,7 +97,9 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
 
         // If this was the 1st consumer, or if all the messages are already acked, then we
         // don't need to do anything special
-        if (consumerList.size() > 1 && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
+        if (allowOutOfOrderDelivery == false
+                && consumerList.size() > 1
+                && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
             recentlyJoinedConsumers.put(consumer, (PositionImpl) cursor.getReadPosition());
         }
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
index c20da49..8650226 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.broker.service.persistent;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -33,8 +34,6 @@ import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
@@ -50,7 +49,6 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap;
-import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
@@ -58,10 +56,6 @@ import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionFence
 import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionInvalidCursorPosition;
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Dispatcher;
-import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector;
-import org.apache.pulsar.broker.service.StickyKeyConsumerSelector;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
@@ -221,28 +215,10 @@ public class PersistentSubscription implements Subscription {
             case Key_Shared:
                 if (dispatcher == null || dispatcher.getType() != SubType.Key_Shared) {
                     previousDispatcher = dispatcher;
-                    KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta() : KeySharedMeta.getDefaultInstance();
-
-                    switch (ksm.getKeySharedMode()) {
-                        case STICKY:
-                            dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
-                                    new HashRangeExclusiveStickyKeyConsumerSelector());
-                            break;
-
-                        case AUTO_SPLIT:
-                        default:
-                            StickyKeyConsumerSelector selector;
-                            ServiceConfiguration conf = topic.getBrokerService().getPulsar().getConfiguration();
-                            if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
-                                selector = new ConsistentHashingStickyKeyConsumerSelector(
-                                        conf.getSubscriptionKeySharedConsistentHashingReplicaPoints());
-                            } else {
-                                selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
-                            }
-
-                            dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this, selector);
-                            break;
-                    }
+                    KeySharedMeta ksm = consumer.getKeySharedMeta() != null ? consumer.getKeySharedMeta()
+                            : KeySharedMeta.getDefaultInstance();
+                    dispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(topic, cursor, this,
+                            topic.getBrokerService().getPulsar().getConfiguration(), ksm);
                 }
                 break;
             default:
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
index 29b807a..6325f10 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.*;
 import org.apache.pulsar.common.api.proto.PulsarApi;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
 import org.apache.pulsar.common.protocol.Commands;
 import org.apache.pulsar.common.protocol.Markers;
 import org.mockito.ArgumentCaptor;
@@ -118,7 +119,7 @@ public class PersistentStickyKeyDispatcherMultipleConsumersTest {
         ).thenReturn(false);
 
         persistentDispatcher = new PersistentStickyKeyDispatcherMultipleConsumers(
-                topicMock, cursorMock, subscriptionMock, new ConsistentHashingStickyKeyConsumerSelector(100));
+                topicMock, cursorMock, subscriptionMock, configMock, KeySharedMeta.getDefaultInstance());
         persistentDispatcher.addConsumer(consumerMock);
         persistentDispatcher.consumerFlow(consumerMock, 1000);
     }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java
index a3ab658..d6040e6 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/KeySharedPolicy.java
@@ -29,6 +29,8 @@ public abstract class KeySharedPolicy {
 
     protected KeySharedMode keySharedMode;
 
+    protected boolean allowOutOfOrderDelivery = false;
+
     public static final int DEFAULT_HASH_RANGE_SIZE = 2 << 15;
 
     public static KeySharedPolicyAutoSplit autoSplitHashRange() {
@@ -41,6 +43,25 @@ public abstract class KeySharedPolicy {
 
     public abstract void validate();
 
+    /**
+     * If enabled, it will relax the ordering requirement, allowing the broker to send out-of-order messages in case of
+     * failures. This will make it faster for new consumers to join without being stalled by an existing slow consumer.
+     *
+     * <p>In this case, a single consumer will still receive all the keys, but they may be coming in different orders.
+     *
+     * @param allowOutOfOrderDelivery
+     *            whether to allow for out of order delivery
+     * @return KeySharedPolicy instance
+     */
+    public KeySharedPolicy setAllowOutOfOrderDelivery(boolean allowOutOfOrderDelivery) {
+        this.allowOutOfOrderDelivery = allowOutOfOrderDelivery;
+        return this;
+    }
+
+    public boolean isAllowOutOfOrderDelivery() {
+        return allowOutOfOrderDelivery;
+    }
+
     public KeySharedMode getKeySharedMode() {
         return this.keySharedMode;
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index c0b0143..02fed41 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -10252,6 +10252,10 @@ public final class PulsarApi {
         getHashRangesList();
     org.apache.pulsar.common.api.proto.PulsarApi.IntRange getHashRanges(int index);
     int getHashRangesCount();
+    
+    // optional bool allowOutOfOrderDelivery = 4 [default = false];
+    boolean hasAllowOutOfOrderDelivery();
+    boolean getAllowOutOfOrderDelivery();
   }
   public static final class KeySharedMeta extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -10319,9 +10323,20 @@ public final class PulsarApi {
       return hashRanges_.get(index);
     }
     
+    // optional bool allowOutOfOrderDelivery = 4 [default = false];
+    public static final int ALLOWOUTOFORDERDELIVERY_FIELD_NUMBER = 4;
+    private boolean allowOutOfOrderDelivery_;
+    public boolean hasAllowOutOfOrderDelivery() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public boolean getAllowOutOfOrderDelivery() {
+      return allowOutOfOrderDelivery_;
+    }
+    
     private void initFields() {
       keySharedMode_ = org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMode.AUTO_SPLIT;
       hashRanges_ = java.util.Collections.emptyList();
+      allowOutOfOrderDelivery_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -10356,6 +10371,9 @@ public final class PulsarApi {
       for (int i = 0; i < hashRanges_.size(); i++) {
         output.writeMessage(3, hashRanges_.get(i));
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBool(4, allowOutOfOrderDelivery_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -10372,6 +10390,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeMessageSize(3, hashRanges_.get(i));
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(4, allowOutOfOrderDelivery_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -10489,6 +10511,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000001);
         hashRanges_ = java.util.Collections.emptyList();
         bitField0_ = (bitField0_ & ~0x00000002);
+        allowOutOfOrderDelivery_ = false;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -10531,6 +10555,10 @@ public final class PulsarApi {
           bitField0_ = (bitField0_ & ~0x00000002);
         }
         result.hashRanges_ = hashRanges_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.allowOutOfOrderDelivery_ = allowOutOfOrderDelivery_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -10550,6 +10578,9 @@ public final class PulsarApi {
           }
           
         }
+        if (other.hasAllowOutOfOrderDelivery()) {
+          setAllowOutOfOrderDelivery(other.getAllowOutOfOrderDelivery());
+        }
         return this;
       }
       
@@ -10604,6 +10635,11 @@ public final class PulsarApi {
               addHashRanges(subBuilder.buildPartial());
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000004;
+              allowOutOfOrderDelivery_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -10723,6 +10759,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool allowOutOfOrderDelivery = 4 [default = false];
+      private boolean allowOutOfOrderDelivery_ ;
+      public boolean hasAllowOutOfOrderDelivery() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public boolean getAllowOutOfOrderDelivery() {
+        return allowOutOfOrderDelivery_;
+      }
+      public Builder setAllowOutOfOrderDelivery(boolean value) {
+        bitField0_ |= 0x00000004;
+        allowOutOfOrderDelivery_ = value;
+        
+        return this;
+      }
+      public Builder clearAllowOutOfOrderDelivery() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        allowOutOfOrderDelivery_ = false;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.KeySharedMeta)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index d822519..0222f3c 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -106,6 +106,8 @@ import org.apache.pulsar.common.api.proto.PulsarApi.CommandSuccess;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandUnsubscribe;
 import org.apache.pulsar.common.api.proto.PulsarApi.FeatureFlags;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyLongValue;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta;
+import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMode;
 import org.apache.pulsar.common.api.proto.PulsarApi.KeyValue;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
 import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
@@ -567,24 +569,21 @@ public class Commands {
         subscribeBuilder.setForceTopicCreation(createTopicIfDoesNotExist);
 
         if (keySharedPolicy != null) {
-            switch (keySharedPolicy.getKeySharedMode()) {
-                case AUTO_SPLIT:
-                    subscribeBuilder.setKeySharedMeta(PulsarApi.KeySharedMeta.newBuilder()
-                            .setKeySharedMode(PulsarApi.KeySharedMode.AUTO_SPLIT));
-                    break;
-                case STICKY:
-                    PulsarApi.KeySharedMeta.Builder builder = PulsarApi.KeySharedMeta.newBuilder()
-                            .setKeySharedMode(PulsarApi.KeySharedMode.STICKY);
-                    List<Range> ranges = ((KeySharedPolicy.KeySharedPolicySticky) keySharedPolicy)
-                            .getRanges();
-                    for (Range range : ranges) {
-                        builder.addHashRanges(PulsarApi.IntRange.newBuilder()
-                                .setStart(range.getStart())
-                                .setEnd(range.getEnd()));
-                    }
-                    subscribeBuilder.setKeySharedMeta(builder);
-                    break;
+            KeySharedMeta.Builder keySharedMetaBuilder = PulsarApi.KeySharedMeta.newBuilder();
+            keySharedMetaBuilder.setAllowOutOfOrderDelivery(keySharedPolicy.isAllowOutOfOrderDelivery());
+            keySharedMetaBuilder.setKeySharedMode(convertKeySharedMode(keySharedPolicy.getKeySharedMode()));
+
+            if (keySharedPolicy instanceof KeySharedPolicy.KeySharedPolicySticky) {
+                List<Range> ranges = ((KeySharedPolicy.KeySharedPolicySticky) keySharedPolicy)
+                        .getRanges();
+                for (Range range : ranges) {
+                    keySharedMetaBuilder.addHashRanges(PulsarApi.IntRange.newBuilder()
+                            .setStart(range.getStart())
+                            .setEnd(range.getEnd()));
+                }
             }
+
+            subscribeBuilder.setKeySharedMeta(keySharedMetaBuilder.build());
         }
 
         if (startMessageId != null) {
@@ -611,6 +610,16 @@ public class Commands {
         return res;
     }
 
+
+    private static KeySharedMode convertKeySharedMode(org.apache.pulsar.client.api.KeySharedMode mode) {
+        switch (mode) {
+        case AUTO_SPLIT: return KeySharedMode.AUTO_SPLIT;
+        case STICKY: return KeySharedMode.STICKY;
+        default:
+            throw new IllegalArgumentException("Unexpected key shared mode: " + mode);
+        }
+    }
+
     public static ByteBuf newUnsubscribe(long consumerId, long requestId) {
         CommandUnsubscribe.Builder unsubscribeBuilder = CommandUnsubscribe.newBuilder();
         unsubscribeBuilder.setConsumerId(consumerId);
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index c483fce..be5e4d4 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -290,6 +290,7 @@ enum KeySharedMode {
 message KeySharedMeta {
     required KeySharedMode keySharedMode = 1;
     repeated IntRange hashRanges = 3;
+    optional bool allowOutOfOrderDelivery = 4 [default = false];
 }
 
 message CommandSubscribe {