You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/22 14:04:50 UTC

[GitHub] sijie closed pull request #3118: UnAcked message tracker based on TimePartition

sijie closed pull request #3118: UnAcked message tracker based on TimePartition
URL: https://github.com/apache/pulsar/pull/3118
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
index 87f8d366dc..6db887b37f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PerMessageUnAcknowledgedRedeliveryTest.java
@@ -112,7 +112,7 @@ public void testSharedAckedNormalTopic() throws Exception {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+        Thread.sleep(ackTimeOutMillis);
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -210,7 +210,7 @@ public void testExclusiveAckedNormalTopic() throws Exception {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+        Thread.sleep(ackTimeOutMillis);
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -308,7 +308,7 @@ public void testFailoverAckedNormalTopic() throws Exception {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((ConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
+        Thread.sleep(ackTimeOutMillis);
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
@@ -415,8 +415,7 @@ public void testSharedAckedPartitionedTopic() throws Exception {
         assertEquals(received, 5);
 
         // 7. Simulate ackTimeout
-        ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
-        ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
+        Thread.sleep(ackTimeOutMillis);
 
         // 8. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 55be9ecbd4..b791033c9d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -381,8 +381,7 @@ public void testConsumerUnackedRedelivery() throws Exception {
         assertEquals(received, totalMessages);
 
         // 8. Simulate ackTimeout
-        ((MultiTopicsConsumerImpl<byte[]>) consumer).getUnAckedMessageTracker().toggle();
-        ((MultiTopicsConsumerImpl<byte[]>) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
+        Thread.sleep(ackTimeOutMillis);
 
         // 9. producer publish more messages
         for (int i = 0; i < totalMessages / 3; i++) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
index 5ffba38ffc..4539d6d6e0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerConfiguration.java
@@ -20,13 +20,13 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 
 import java.io.Serializable;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
-import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+
 /**
  * Class specifying the configuration of a consumer. In Exclusive subscription, only a single consumer is allowed to
  * attach to the subscription. Other consumers will get an error message. In Shared subscription, multiple consumers
@@ -60,6 +60,10 @@ public long getAckTimeoutMillis() {
         return conf.getAckTimeoutMillis();
     }
 
+    public long getTickDurationMillis() {
+        return conf.getTickDurationMillis();
+    }
+
     /**
      * Set the timeout for unacked messages, truncated to the nearest millisecond. The timeout needs to be greater than
      * 10 seconds.
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index b83804be8c..4ee0feb68a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -49,7 +49,6 @@
 import org.apache.pulsar.common.util.FutureUtil;
 
 import com.google.common.collect.Lists;
-
 import lombok.NonNull;
 
 public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 3e2d4d6a8a..df65361b9a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -27,7 +27,6 @@
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Iterables;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.util.Timeout;
 
@@ -52,11 +51,11 @@
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
-
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerStats;
+import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
@@ -66,7 +65,6 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.common.api.Commands;
-import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.common.api.EncryptionContext;
 import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
 import org.apache.pulsar.common.api.PulsarDecoder;
@@ -188,7 +186,11 @@
         }
 
         if (conf.getAckTimeoutMillis() != 0) {
-            this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
+            if (conf.getTickDurationMillis() > 0) {
+                this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
+            } else {
+                this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
+            }
         } else {
             this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 41821ad93a..e9ef632d1a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -99,7 +99,11 @@
         this.allTopicPartitionsNumber = new AtomicInteger(0);
 
         if (conf.getAckTimeoutMillis() != 0) {
-            this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+            if (conf.getTickDurationMillis() > 0) {
+                this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
+            } else {
+                this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+            }
         } else {
             this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
         }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 266eb3b9db..d31d353ac8 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -18,29 +18,36 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.base.Preconditions;
 import io.netty.util.Timeout;
 import io.netty.util.TimerTask;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.apache.pulsar.client.api.MessageId;
-import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 public class UnAckedMessageTracker implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
-    protected ConcurrentOpenHashSet<MessageId> currentSet;
-    protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
-    private final ReentrantReadWriteLock readWriteLock;
+
+    protected final ConcurrentHashMap<MessageId, ConcurrentOpenHashSet<MessageId>> messageIdPartitionMap;
+    protected final LinkedList<ConcurrentOpenHashSet<MessageId>> timePartitions;
+
     protected final Lock readLock;
-    private final Lock writeLock;
-    private Timeout timeout;
+    protected final Lock writeLock;
 
     public static final UnAckedMessageTrackerDisabled UNACKED_MESSAGE_TRACKER_DISABLED = new UnAckedMessageTrackerDisabled();
+    private final long ackTimeoutMillis;
+    private final long tickDurationInMs;
 
     private static class UnAckedMessageTrackerDisabled extends UnAckedMessageTracker {
         @Override
@@ -67,116 +74,138 @@ public void close() {
         }
     }
 
+    private Timeout timeout;
+
     public UnAckedMessageTracker() {
-        readWriteLock = null;
         readLock = null;
         writeLock = null;
+        timePartitions = null;
+        messageIdPartitionMap = null;
+        this.ackTimeoutMillis = 0;
+        this.tickDurationInMs = 0;
     }
 
     public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis) {
-        currentSet = new ConcurrentOpenHashSet<MessageId>();
-        oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
-        readWriteLock = new ReentrantReadWriteLock();
-        readLock = readWriteLock.readLock();
-        writeLock = readWriteLock.writeLock();
-        start(client, consumerBase, ackTimeoutMillis);
+        this(client, consumerBase, ackTimeoutMillis, ackTimeoutMillis);
     }
 
-    public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis) {
-        this.stop();
+    public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis, long tickDurationInMs) {
+        Preconditions.checkArgument(tickDurationInMs > 0 && ackTimeoutMillis >= tickDurationInMs);
+        this.ackTimeoutMillis = ackTimeoutMillis;
+        this.tickDurationInMs = tickDurationInMs;
+        ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+        this.readLock = readWriteLock.readLock();
+        this.writeLock = readWriteLock.writeLock();
+        this.messageIdPartitionMap = new ConcurrentHashMap<>();
+        this.timePartitions = new LinkedList<>();
+
+        int blankPartitions = (int)Math.ceil((double)this.ackTimeoutMillis / this.tickDurationInMs);
+        for (int i = 0; i < blankPartitions + 1; i++) {
+            timePartitions.add(new ConcurrentOpenHashSet<>());
+        }
+
         timeout = client.timer().newTimeout(new TimerTask() {
             @Override
             public void run(Timeout t) throws Exception {
-                if (isAckTimeout()) {
-                    log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
-                    Set<MessageId> messageIds = new HashSet<>();
-                    oldOpenSet.forEach(messageIds::add);
-                    oldOpenSet.clear();
+                Set<MessageId> messageIds = new HashSet<>();
+                writeLock.lock();
+                try {
+                    timePartitions.addLast(new ConcurrentOpenHashSet<>());
+                    ConcurrentOpenHashSet<MessageId> headPartition = timePartitions.removeFirst();
+                    if (!headPartition.isEmpty()) {
+                        log.warn("[{}] {} messages have timed-out", consumerBase, timePartitions.size());
+                        headPartition.forEach(messageId -> {
+                            messageIds.add(messageId);
+                            messageIdPartitionMap.remove(messageId);
+                        });
+                    }
+                } finally {
+                    writeLock.unlock();
+                }
+                if (messageIds.size() > 0) {
                     consumerBase.redeliverUnacknowledgedMessages(messageIds);
                 }
-                toggle();
-                timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS);
+                timeout = client.timer().newTimeout(this, tickDurationInMs, TimeUnit.MILLISECONDS);
             }
-        }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
+        }, this.tickDurationInMs, TimeUnit.MILLISECONDS);
     }
 
-    void toggle() {
+    public void clear() {
         writeLock.lock();
         try {
-            ConcurrentOpenHashSet<MessageId> temp = currentSet;
-            currentSet = oldOpenSet;
-            oldOpenSet = temp;
+            messageIdPartitionMap.clear();
+            timePartitions.clear();
+            int blankPartitions = (int)Math.ceil((double)ackTimeoutMillis / tickDurationInMs);
+            for (int i = 0; i < blankPartitions + 1; i++) {
+                timePartitions.add(new ConcurrentOpenHashSet<>());
+            }
         } finally {
             writeLock.unlock();
         }
     }
 
-    public void clear() {
-        readLock.lock();
-        try {
-            currentSet.clear();
-            oldOpenSet.clear();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    public boolean add(MessageId m) {
-        readLock.lock();
+    public boolean add(MessageId messageId) {
+        writeLock.lock();
         try {
-            oldOpenSet.remove(m);
-            return currentSet.add(m);
+            ConcurrentOpenHashSet<MessageId> partition = timePartitions.peekLast();
+            messageIdPartitionMap.put(messageId, partition);
+            return partition.add(messageId);
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
         }
-
     }
 
     boolean isEmpty() {
         readLock.lock();
         try {
-            return currentSet.isEmpty() && oldOpenSet.isEmpty();
+            return messageIdPartitionMap.isEmpty();
         } finally {
             readLock.unlock();
         }
     }
 
-    public boolean remove(MessageId m) {
-        readLock.lock();
+    public boolean remove(MessageId messageId) {
+        writeLock.lock();
         try {
-            return currentSet.remove(m) || oldOpenSet.remove(m);
+            boolean removed = false;
+            ConcurrentOpenHashSet<MessageId> exist = messageIdPartitionMap.remove(messageId);
+            if (exist != null) {
+                removed = exist.remove(messageId);
+            }
+            return removed;
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
         }
     }
 
     long size() {
         readLock.lock();
         try {
-            return currentSet.size() + oldOpenSet.size();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    private boolean isAckTimeout() {
-        readLock.lock();
-        try {
-            return !oldOpenSet.isEmpty();
+            return messageIdPartitionMap.size();
         } finally {
             readLock.unlock();
         }
     }
 
     public int removeMessagesTill(MessageId msgId) {
-        readLock.lock();
+        writeLock.lock();
         try {
-            int currentSetRemovedMsgCount = currentSet.removeIf(m -> (m.compareTo(msgId) <= 0));
-            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> (m.compareTo(msgId) <= 0));
-
-            return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+            int removed = 0;
+            Iterator<MessageId> iterator = messageIdPartitionMap.keySet().iterator();
+            while (iterator.hasNext()) {
+                MessageId messageId = iterator.next();
+                if (messageId.compareTo(msgId) <= 0) {
+                    ConcurrentOpenHashSet<MessageId> exist = messageIdPartitionMap.get(messageId);
+                    if (exist != null) {
+                        exist.remove(messageId);
+                    }
+                    iterator.remove();
+                    removed ++;
+                }
+            }
+            return removed;
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
index f500fda040..afe9a5e072 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -18,7 +18,10 @@
  */
 package org.apache.pulsar.client.impl;
 
-import static com.google.common.base.Preconditions.checkState;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
+
+import java.util.Iterator;
 
 public class UnAckedTopicMessageTracker extends UnAckedMessageTracker {
 
@@ -26,23 +29,30 @@ public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase<?> consu
         super(client, consumerBase, ackTimeoutMillis);
     }
 
+    public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis, long tickDurationMillis) {
+        super(client, consumerBase, ackTimeoutMillis, tickDurationMillis);
+    }
+
     public int removeTopicMessages(String topicName) {
-        readLock.lock();
+        writeLock.lock();
         try {
-            int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
-                checkState(m instanceof TopicMessageIdImpl,
-                    "message should be of type TopicMessageIdImpl");
-                return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
-            });
-            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
-                checkState(m instanceof TopicMessageIdImpl,
-                    "message should be of type TopicMessageIdImpl");
-                return ((TopicMessageIdImpl)m).getTopicPartitionName().contains(topicName);
-            });
-
-            return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+            int removed = 0;
+            Iterator<MessageId> iterator = messageIdPartitionMap.keySet().iterator();
+            while (iterator.hasNext()) {
+                MessageId messageId = iterator.next();
+                if (messageId instanceof TopicMessageIdImpl &&
+                        ((TopicMessageIdImpl)messageId).getTopicPartitionName().contains(topicName)) {
+                    ConcurrentOpenHashSet<MessageId> exist = messageIdPartitionMap.get(messageId);
+                    if (exist != null) {
+                        exist.remove(messageId);
+                    }
+                    iterator.remove();
+                    removed ++;
+                }
+            }
+            return removed;
         } finally {
-            readLock.unlock();
+            writeLock.unlock();
         }
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 7e92d1245d..2c56b61dd0 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -23,22 +23,22 @@
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import java.io.Serializable;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import lombok.Data;
-
-import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.CryptoKeyReader;
 import org.apache.pulsar.client.api.DeadLetterPolicy;
 import org.apache.pulsar.client.api.MessageListener;
-import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Mode;
 
 @Data
@@ -69,6 +69,8 @@
 
     private long ackTimeoutMillis = 0;
 
+    private long tickDurationMillis = 1000;
+
     private int priorityLevel = 0;
 
     @JsonIgnore


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services