You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/03/09 11:03:20 UTC

[GitHub] ivankelly closed pull request #1228: Issue #570: Move logic of unpersistedbytes to bufferedchannel

ivankelly closed pull request #1228: Issue #570: Move logic of unpersistedbytes to bufferedchannel
URL: https://github.com/apache/bookkeeper/pull/1228
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 0d21d415c..05a20e549 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -41,20 +41,46 @@
     // The buffer used to write operations.
     protected final ByteBuf writeBuffer;
     // The absolute position of the next write operation.
-    protected volatile long position;
+    protected final AtomicLong position;
+
+    /*
+     * if unpersistedBytesBound is non-zero value, then after writing to
+     * writeBuffer, it will check if the unpersistedBytes is greater than
+     * unpersistedBytesBound and then calls flush method if it is greater.
+     *
+     * It is a best-effort feature, since 'forceWrite' method is not
+     * synchronized and unpersistedBytes is reset in 'forceWrite' method before
+     * calling fileChannel.force
+     */
+    protected final long unpersistedBytesBound;
+
+    /*
+     * it tracks the number of bytes which are not persisted yet by force
+     * writing the FileChannel. The unpersisted bytes could be in writeBuffer or
+     * in fileChannel system cache.
+     */
+    protected final AtomicLong unpersistedBytes;
 
     // make constructor to be public for unit test
     public BufferedChannel(FileChannel fc, int capacity) throws IOException {
         // Use the same capacity for read and write buffers.
-        this(fc, capacity, capacity);
+        this(fc, capacity, 0L);
+    }
+
+    public BufferedChannel(FileChannel fc, int capacity, long unpersistedBytesBound) throws IOException {
+        // Use the same capacity for read and write buffers.
+        this(fc, capacity, capacity, unpersistedBytesBound);
     }
 
-    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long unpersistedBytesBound)
+            throws IOException {
         super(fc, readCapacity);
         this.writeCapacity = writeCapacity;
-        this.position = fc.position();
-        this.writeBufferStartPosition.set(position);
+        this.position = new AtomicLong(fc.position());
+        this.writeBufferStartPosition.set(position.get());
         this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+        this.unpersistedBytes = new AtomicLong(0);
+        this.unpersistedBytesBound = unpersistedBytesBound;
     }
 
     @Override
@@ -70,20 +96,34 @@ public void close() throws IOException {
      * @param src The source ByteBuffer which contains the data to be written.
      * @throws IOException if a write operation fails.
      */
-    public synchronized void write(ByteBuf src) throws IOException {
+    public void write(ByteBuf src) throws IOException {
         int copied = 0;
-        int len = src.readableBytes();
-        while (copied < len) {
-            int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
-            writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
-            copied += bytesToCopy;
-
-            // if we have run out of buffer space, we should flush to the file
-            if (!writeBuffer.isWritable()) {
-                flushInternal();
+        boolean shouldForceWrite = false;
+        synchronized (this) {
+            int len = src.readableBytes();
+            while (copied < len) {
+                int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
+                writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
+                copied += bytesToCopy;
+
+                // if we have run out of buffer space, we should flush to the
+                // file
+                if (!writeBuffer.isWritable()) {
+                    flush();
+                }
+            }
+            position.addAndGet(copied);
+            unpersistedBytes.addAndGet(copied);
+            if (unpersistedBytesBound > 0) {
+                if (unpersistedBytes.get() >= unpersistedBytesBound) {
+                    flush();
+                    shouldForceWrite = true;
+                }
             }
         }
-        position += copied;
+        if (shouldForceWrite) {
+            forceWrite(false);
+        }
     }
 
     /**
@@ -91,7 +131,7 @@ public synchronized void write(ByteBuf src) throws IOException {
      * @return
      */
     public long position() {
-        return position;
+        return position.get();
     }
 
     /**
@@ -102,28 +142,27 @@ public long getFileChannelPosition() {
         return writeBufferStartPosition.get();
     }
 
-
     /**
-     * Write any data in the buffer to the file. If sync is set to true, force a sync operation so that
-     * data is persisted to the disk.
-     * @param shouldForceWrite
-     * @throws IOException if the write or sync operation fails.
+     * calls both flush and forceWrite methods.
+     *
+     * @param forceMetadata
+     *            - If true then this method is required to force changes to
+     *            both the file's content and metadata to be written to storage;
+     *            otherwise, it need only force content changes to be written
+     * @throws IOException
      */
-    public void flush(boolean shouldForceWrite) throws IOException {
-        synchronized (this) {
-            flushInternal();
-        }
-        if (shouldForceWrite) {
-            forceWrite(false);
-        }
+    public void flushAndForceWrite(boolean forceMetadata) throws IOException {
+        flush();
+        forceWrite(forceMetadata);
     }
 
     /**
      * Write any data in the buffer to the file and advance the writeBufferPosition.
      * Callers are expected to synchronize appropriately
+     *
      * @throws IOException if the write fails.
      */
-    private void flushInternal() throws IOException {
+    public synchronized void flush() throws IOException {
         ByteBuffer toWrite = writeBuffer.internalNioBuffer(0, writeBuffer.writerIndex());
         do {
             fileChannel.write(toWrite);
@@ -132,12 +171,33 @@ private void flushInternal() throws IOException {
         writeBufferStartPosition.set(fileChannel.position());
     }
 
+    /*
+     * force a sync operation so that data is persisted to the disk.
+     */
     public long forceWrite(boolean forceMetadata) throws IOException {
         // This is the point up to which we had flushed to the file system page cache
         // before issuing this force write hence is guaranteed to be made durable by
         // the force write, any flush that happens after this may or may
         // not be flushed
         long positionForceWrite = writeBufferStartPosition.get();
+        /*
+         * since forceWrite method is not called in synchronized block, to make
+         * sure we are not undercounting unpersistedBytes, setting
+         * unpersistedBytes to the current number of bytes in writeBuffer.
+         *
+         * since we are calling fileChannel.force, bytes which are written to
+         * filechannel (system filecache) will be persisted to the disk. So we
+         * dont need to consider those bytes for setting value to
+         * unpersistedBytes.
+         *
+         * In this method fileChannel.force is not called in synchronized block, so
+         * we are doing best efforts to not overcount or undercount unpersistedBytes.
+         * Hence setting writeBuffer.readableBytes() to unpersistedBytes.
+         *
+         */
+        synchronized (this) {
+            unpersistedBytes.set(writeBuffer.readableBytes());
+        }
         fileChannel.force(forceMetadata);
         return positionForceWrite;
     }
@@ -188,4 +248,12 @@ public synchronized void clear() {
         super.clear();
         writeBuffer.clear();
     }
-}
+
+    public synchronized int getNumOfBytesInWriteBuffer() {
+        return writeBuffer.readableBytes();
+    }
+
+    long getUnpersistedBytes() {
+        return unpersistedBytes.get();
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index df72899af..1792417ad 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -92,8 +92,9 @@ public BufferedLogChannel(FileChannel fc,
                                   int writeCapacity,
                                   int readCapacity,
                                   long logId,
-                                  File logFile) throws IOException {
-            super(fc, writeCapacity, readCapacity);
+                                  File logFile,
+                                  long unpersistedBytesBound) throws IOException {
+            super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
             this.logId = logId;
             this.entryLogMetadata = new EntryLogMetadata(logId);
             this.logFile = logFile;
@@ -197,9 +198,6 @@ public ConcurrentLongLongHashMap getLedgersMap() {
     static final int MIN_SANE_ENTRY_SIZE = 8 + 8;
     static final long MB = 1024 * 1024;
 
-    private final long flushIntervalInBytes;
-    private final boolean doRegularFlushes;
-    private long bytesWrittenSinceLastFlush = 0;
     private final int maxSaneEntrySize;
 
     final ServerConfiguration conf;
@@ -287,8 +285,6 @@ public EntryLogger(ServerConfiguration conf,
         this.leastUnflushedLogId = logId + 1;
         this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
         this.conf = conf;
-        flushIntervalInBytes = conf.getFlushIntervalInBytes();
-        doRegularFlushes = flushIntervalInBytes > 0;
 
         initialize();
     }
@@ -481,7 +477,7 @@ void createNewLog() throws IOException {
 
             // flush the internal buffer back to filesystem but not sync disk
             // so the readers could access the data from filesystem.
-            logChannel.flush(false);
+            logChannel.flush();
 
             // Append ledgers map at the end of entry log
             appendLedgersMap(logChannel);
@@ -570,7 +566,7 @@ public void accept(long ledgerId, long size) {
         }
         // Flush the ledger's map out before we write the header.
         // Otherwise the header might point to something that is not fully written
-        entryLogChannel.flush(false);
+        entryLogChannel.flush();
 
         // Update the headers with the map offset and count of ledgers
         ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
@@ -668,8 +664,8 @@ private BufferedLogChannel allocateNewLog(String suffix) throws IOException {
             } while (newLogFile == null);
 
             FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
-            BufferedLogChannel logChannel = new BufferedLogChannel(channel,
-                    conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId, newLogFile);
+            BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
+                    conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
             logfileHeader.readerIndex(0);
             logChannel.write(logfileHeader);
 
@@ -817,7 +813,7 @@ void flushRotatedLogs() throws IOException {
         while (chIter.hasNext()) {
             BufferedLogChannel channel = chIter.next();
             try {
-                channel.flush(true);
+                channel.flushAndForceWrite(false);
             } catch (IOException ioe) {
                 // rescue from flush exception, add unflushed channels back
                 synchronized (this) {
@@ -851,8 +847,7 @@ public void flush() throws IOException {
 
     synchronized void flushCurrentLog() throws IOException {
         if (logChannel != null) {
-            logChannel.flush(true);
-            bytesWrittenSinceLastFlush = 0;
+            logChannel.flushAndForceWrite(false);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
             }
@@ -881,9 +876,6 @@ public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) t
         // Create new log if logSizeLimit reached or current disk is full
         boolean createNewLog = shouldCreateNewEntryLog.get();
         if (createNewLog || reachEntryLogLimit) {
-            if (doRegularFlushes) {
-                flushCurrentLog();
-            }
             createNewLog();
             // Reset the flag
             if (createNewLog) {
@@ -901,8 +893,6 @@ public synchronized long addEntry(long ledger, ByteBuf entry, boolean rollLog) t
         logChannel.write(entry);
         logChannel.registerWrittenEntry(ledger, entrySize);
 
-        incrementBytesWrittenAndMaybeFlush(4L + entrySize);
-
         return (logChannel.getLogId() << 32L) | pos;
     }
 
@@ -928,7 +918,7 @@ long addEntryForCompaction(long ledgerId, ByteBuf entry) throws IOException {
     void flushCompactionLog() throws IOException {
         synchronized (compactionLogLock) {
             if (compactionLogChannel != null) {
-                compactionLogChannel.flush(true);
+                compactionLogChannel.flushAndForceWrite(false);
                 LOG.info("Flushed compaction log file {} with logId.",
                     compactionLogChannel.getLogFile(),
                     compactionLogChannel.getLogId());
@@ -970,16 +960,6 @@ void removeCurCompactionLog() {
         }
     }
 
-    private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException {
-        if (!doRegularFlushes) {
-            return;
-        }
-        bytesWrittenSinceLastFlush += bytesWritten;
-        if (bytesWrittenSinceLastFlush > flushIntervalInBytes) {
-            flushCurrentLog();
-        }
-    }
-
     static long logIdForOffset(long offset) {
         return offset >> 32L;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 146a83e24..775219bca 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -986,7 +986,7 @@ public void run() {
                                 writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
                             }
                             journalFlushWatcher.reset().start();
-                            bc.flush(false);
+                            bc.flush();
 
                             for (int i = 0; i < toFlush.size(); i++) {
                                 QueueEntry entry = toFlush.get(i);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
index 8568617e0..80d3a6526 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/LedgerManagerFactory.java
@@ -94,7 +94,7 @@ LedgerUnderreplicationManager newLedgerUnderreplicationManager()
      * @param lm
      *            Layout manager
      */
-    void format(final AbstractConfiguration<?> conf, final LayoutManager lm)
+    void format(AbstractConfiguration<?> conf, LayoutManager lm)
             throws InterruptedException, KeeperException, IOException;
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 9294e27f0..5133e78bb 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -171,7 +171,7 @@ private void writePreV2Journal(File journalDir, int numEntries) throws Exception
 
     private static void moveToPosition(JournalChannel jc, long pos) throws IOException {
         jc.fc.position(pos);
-        jc.bc.position = pos;
+        jc.bc.position.set(pos);
         jc.bc.writeBufferStartPosition.set(pos);
     }
 
@@ -211,7 +211,7 @@ private JournalChannel writeV2Journal(File journalDir, int numEntries) throws Ex
             bc.write(packet);
             packet.release();
         }
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
 
         updateJournalVersion(jc, JournalChannel.V2);
 
@@ -245,7 +245,7 @@ private JournalChannel writeV3Journal(File journalDir, int numEntries, byte[] ma
             bc.write(packet);
             packet.release();
         }
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
 
         updateJournalVersion(jc, JournalChannel.V3);
 
@@ -284,7 +284,7 @@ private JournalChannel writeV4Journal(File journalDir, int numEntries, byte[] ma
         lenBuf.writeInt(packet.readableBytes());
         bc.write(lenBuf);
         bc.write(packet);
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
         updateJournalVersion(jc, JournalChannel.V4);
         return jc;
     }
@@ -324,7 +324,7 @@ private JournalChannel writeV5Journal(File journalDir, int numEntries, byte[] ma
         bc.write(lenBuf);
         bc.write(packet);
         Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
         updateJournalVersion(jc, JournalChannel.V5);
         return jc;
     }
@@ -521,7 +521,7 @@ public void testJunkEndedJournal() throws Exception {
 
         JournalChannel jc = writeV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
         jc.getBufferedChannel().write(Unpooled.wrappedBuffer("JunkJunkJunk".getBytes()));
-        jc.getBufferedChannel().flush(true);
+        jc.getBufferedChannel().flushAndForceWrite(false);
 
         writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
new file mode 100644
index 000000000..86f3a8643
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for BufferedChannel.
+ */
+public class BufferedChannelTest {
+
+    private static Random rand = new Random();
+    private static final int INTERNAL_BUFFER_WRITE_CAPACITY = 65536;
+    private static final int INTERNAL_BUFFER_READ_CAPACITY = 512;
+
+    @Test
+    public void testBufferedChannelWithNoBoundOnUnpersistedBytes() throws Exception {
+        testBufferedChannel(5000, 30, 0, false, false);
+    }
+
+    @Test
+    public void testBufferedChannelWithBoundOnUnpersistedBytes() throws Exception {
+        testBufferedChannel(5000, 30, 5000 * 28, false, false);
+    }
+
+    @Test
+    public void testBufferedChannelWithBoundOnUnpersistedBytesAndFlush() throws Exception {
+        testBufferedChannel(5000, 30, 5000 * 28, true, false);
+    }
+
+    @Test
+    public void testBufferedChannelFlushNoForceWrite() throws Exception {
+        testBufferedChannel(5000, 30, 0, true, false);
+    }
+
+    @Test
+    public void testBufferedChannelForceWriteNoFlush() throws Exception {
+        testBufferedChannel(5000, 30, 0, false, true);
+    }
+
+    @Test
+    public void testBufferedChannelFlushForceWrite() throws Exception {
+        testBufferedChannel(5000, 30, 0, true, true);
+    }
+
+    public void testBufferedChannel(int byteBufLength, int numOfWrites, int unpersistedBytesBound, boolean flush,
+            boolean shouldForceWrite) throws Exception {
+        File newLogFile = File.createTempFile("test", "log");
+        newLogFile.deleteOnExit();
+        FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel();
+
+        BufferedChannel logChannel = new BufferedChannel(fileChannel, INTERNAL_BUFFER_WRITE_CAPACITY,
+                INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
+
+        ByteBuf dataBuf = generateEntry(byteBufLength);
+        dataBuf.markReaderIndex();
+        dataBuf.markWriterIndex();
+
+        for (int i = 0; i < numOfWrites; i++) {
+            logChannel.write(dataBuf);
+            dataBuf.resetReaderIndex();
+            dataBuf.resetWriterIndex();
+        }
+
+        if (flush && shouldForceWrite) {
+            logChannel.flushAndForceWrite(false);
+        } else if (flush) {
+            logChannel.flush();
+        } else if (shouldForceWrite) {
+            logChannel.forceWrite(false);
+        }
+
+        int expectedNumOfUnpersistedBytes = 0;
+
+        if (flush && shouldForceWrite) {
+            /*
+             * if flush call is made with shouldForceWrite,
+             * then expectedNumOfUnpersistedBytes should be zero.
+             */
+            expectedNumOfUnpersistedBytes = 0;
+        } else if (!flush && shouldForceWrite) {
+            /*
+             * if flush is not called then internal write buffer is not flushed,
+             * but while adding entries to BufferedChannel if writeBuffer has
+             * reached its capacity then it will call flush method, and the data
+             * gets added to the file buffer. So though explicitly we are not
+             * calling flush method, implicitly flush gets called when
+             * writeBuffer reaches its capacity.
+             */
+            expectedNumOfUnpersistedBytes = (byteBufLength * numOfWrites) % INTERNAL_BUFFER_WRITE_CAPACITY;
+        } else {
+            expectedNumOfUnpersistedBytes = (byteBufLength * numOfWrites) - unpersistedBytesBound;
+        }
+
+        Assert.assertEquals("Unpersisted bytes", expectedNumOfUnpersistedBytes, logChannel.getUnpersistedBytes());
+        logChannel.close();
+        fileChannel.close();
+    }
+
+    private static ByteBuf generateEntry(int length) {
+        byte[] data = new byte[length];
+        ByteBuf bb = Unpooled.buffer(length);
+        rand.nextBytes(data);
+        bb.writeBytes(data);
+        return bb;
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index 77f245332..b89a5b019 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -105,7 +105,7 @@ static JournalChannel writeJournal(File journalDir, int numEntries, byte[] maste
             bc.write(packet);
             packet.release();
         }
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
 
         return jc;
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services