You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by iv...@apache.org on 2018/03/09 11:03:18 UTC

[bookkeeper] branch master updated: Issue #570: Move logic of unpersistedbytes to bufferedchannel

This is an automated email from the ASF dual-hosted git repository.

ivank pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 30261ea  Issue #570: Move logic of unpersistedbytes to bufferedchannel
30261ea is described below

commit 30261eae3fd8ab25239d57cfb86a200d5f7b6b7d
Author: Charan Reddy Guttapalem <cg...@salesforce.com>
AuthorDate: Fri Mar 9 12:03:12 2018 +0100

    Issue #570: Move logic of unpersistedbytes to bufferedchannel
    
    Descriptions of the changes in this PR:
    
    This is < sub-task2 > of Issue #570.
    
    https://github.com/apache/bookkeeper/commit/26b09abb4202362ca37d6944ce75eb2a3309dc3c
    introduced the flushEntrylogBytes factor. But it is structurally correct to have this
    logic in BufferedChannel, rather than in EntryLogger, since it is paramter of BufferedChannel.
    
    Master Issue: #570
    
    Author: cguttapalem <cg...@salesforce.com>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Sijie Guo <si...@apache.org>
    
    This closes #1228 from reddycharan/fixunpersistedbytes, closes #570
---
 .../apache/bookkeeper/bookie/BufferedChannel.java  | 130 +++++++++++++++-----
 .../org/apache/bookkeeper/bookie/EntryLogger.java  |  40 ++----
 .../java/org/apache/bookkeeper/bookie/Journal.java |   2 +-
 .../bookkeeper/bookie/BookieJournalTest.java       |  12 +-
 .../bookkeeper/bookie/BufferedChannelTest.java     | 134 +++++++++++++++++++++
 .../org/apache/bookkeeper/bookie/UpgradeTest.java  |   2 +-
 6 files changed, 251 insertions(+), 69 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
index 0d21d41..05a20e5 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BufferedChannel.java
@@ -41,20 +41,46 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
     // The buffer used to write operations.
     protected final ByteBuf writeBuffer;
     // The absolute position of the next write operation.
-    protected volatile long position;
+    protected final AtomicLong position;
+
+    /*
+     * if unpersistedBytesBound is non-zero value, then after writing to
+     * writeBuffer, it will check if the unpersistedBytes is greater than
+     * unpersistedBytesBound and then calls flush method if it is greater.
+     *
+     * It is a best-effort feature, since 'forceWrite' method is not
+     * synchronized and unpersistedBytes is reset in 'forceWrite' method before
+     * calling fileChannel.force
+     */
+    protected final long unpersistedBytesBound;
+
+    /*
+     * it tracks the number of bytes which are not persisted yet by force
+     * writing the FileChannel. The unpersisted bytes could be in writeBuffer or
+     * in fileChannel system cache.
+     */
+    protected final AtomicLong unpersistedBytes;
 
     // make constructor to be public for unit test
     public BufferedChannel(FileChannel fc, int capacity) throws IOException {
         // Use the same capacity for read and write buffers.
-        this(fc, capacity, capacity);
+        this(fc, capacity, 0L);
+    }
+
+    public BufferedChannel(FileChannel fc, int capacity, long unpersistedBytesBound) throws IOException {
+        // Use the same capacity for read and write buffers.
+        this(fc, capacity, capacity, unpersistedBytesBound);
     }
 
-    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity) throws IOException {
+    public BufferedChannel(FileChannel fc, int writeCapacity, int readCapacity, long unpersistedBytesBound)
+            throws IOException {
         super(fc, readCapacity);
         this.writeCapacity = writeCapacity;
-        this.position = fc.position();
-        this.writeBufferStartPosition.set(position);
+        this.position = new AtomicLong(fc.position());
+        this.writeBufferStartPosition.set(position.get());
         this.writeBuffer = ByteBufAllocator.DEFAULT.directBuffer(writeCapacity);
+        this.unpersistedBytes = new AtomicLong(0);
+        this.unpersistedBytesBound = unpersistedBytesBound;
     }
 
     @Override
@@ -70,20 +96,34 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
      * @param src The source ByteBuffer which contains the data to be written.
      * @throws IOException if a write operation fails.
      */
-    public synchronized void write(ByteBuf src) throws IOException {
+    public void write(ByteBuf src) throws IOException {
         int copied = 0;
-        int len = src.readableBytes();
-        while (copied < len) {
-            int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
-            writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
-            copied += bytesToCopy;
-
-            // if we have run out of buffer space, we should flush to the file
-            if (!writeBuffer.isWritable()) {
-                flushInternal();
+        boolean shouldForceWrite = false;
+        synchronized (this) {
+            int len = src.readableBytes();
+            while (copied < len) {
+                int bytesToCopy = Math.min(src.readableBytes() - copied, writeBuffer.writableBytes());
+                writeBuffer.writeBytes(src, src.readerIndex() + copied, bytesToCopy);
+                copied += bytesToCopy;
+
+                // if we have run out of buffer space, we should flush to the
+                // file
+                if (!writeBuffer.isWritable()) {
+                    flush();
+                }
+            }
+            position.addAndGet(copied);
+            unpersistedBytes.addAndGet(copied);
+            if (unpersistedBytesBound > 0) {
+                if (unpersistedBytes.get() >= unpersistedBytesBound) {
+                    flush();
+                    shouldForceWrite = true;
+                }
             }
         }
-        position += copied;
+        if (shouldForceWrite) {
+            forceWrite(false);
+        }
     }
 
     /**
@@ -91,7 +131,7 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
      * @return
      */
     public long position() {
-        return position;
+        return position.get();
     }
 
     /**
@@ -102,28 +142,27 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
         return writeBufferStartPosition.get();
     }
 
-
     /**
-     * Write any data in the buffer to the file. If sync is set to true, force a sync operation so that
-     * data is persisted to the disk.
-     * @param shouldForceWrite
-     * @throws IOException if the write or sync operation fails.
+     * calls both flush and forceWrite methods.
+     *
+     * @param forceMetadata
+     *            - If true then this method is required to force changes to
+     *            both the file's content and metadata to be written to storage;
+     *            otherwise, it need only force content changes to be written
+     * @throws IOException
      */
-    public void flush(boolean shouldForceWrite) throws IOException {
-        synchronized (this) {
-            flushInternal();
-        }
-        if (shouldForceWrite) {
-            forceWrite(false);
-        }
+    public void flushAndForceWrite(boolean forceMetadata) throws IOException {
+        flush();
+        forceWrite(forceMetadata);
     }
 
     /**
      * Write any data in the buffer to the file and advance the writeBufferPosition.
      * Callers are expected to synchronize appropriately
+     *
      * @throws IOException if the write fails.
      */
-    private void flushInternal() throws IOException {
+    public synchronized void flush() throws IOException {
         ByteBuffer toWrite = writeBuffer.internalNioBuffer(0, writeBuffer.writerIndex());
         do {
             fileChannel.write(toWrite);
@@ -132,12 +171,33 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
         writeBufferStartPosition.set(fileChannel.position());
     }
 
+    /*
+     * force a sync operation so that data is persisted to the disk.
+     */
     public long forceWrite(boolean forceMetadata) throws IOException {
         // This is the point up to which we had flushed to the file system page cache
         // before issuing this force write hence is guaranteed to be made durable by
         // the force write, any flush that happens after this may or may
         // not be flushed
         long positionForceWrite = writeBufferStartPosition.get();
+        /*
+         * since forceWrite method is not called in synchronized block, to make
+         * sure we are not undercounting unpersistedBytes, setting
+         * unpersistedBytes to the current number of bytes in writeBuffer.
+         *
+         * since we are calling fileChannel.force, bytes which are written to
+         * filechannel (system filecache) will be persisted to the disk. So we
+         * dont need to consider those bytes for setting value to
+         * unpersistedBytes.
+         *
+         * In this method fileChannel.force is not called in synchronized block, so
+         * we are doing best efforts to not overcount or undercount unpersistedBytes.
+         * Hence setting writeBuffer.readableBytes() to unpersistedBytes.
+         *
+         */
+        synchronized (this) {
+            unpersistedBytes.set(writeBuffer.readableBytes());
+        }
         fileChannel.force(forceMetadata);
         return positionForceWrite;
     }
@@ -188,4 +248,12 @@ public class BufferedChannel extends BufferedReadChannel implements Closeable {
         super.clear();
         writeBuffer.clear();
     }
-}
+
+    public synchronized int getNumOfBytesInWriteBuffer() {
+        return writeBuffer.readableBytes();
+    }
+
+    long getUnpersistedBytes() {
+        return unpersistedBytes.get();
+    }
+}
\ No newline at end of file
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
index df72899..1792417 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryLogger.java
@@ -92,8 +92,9 @@ public class EntryLogger {
                                   int writeCapacity,
                                   int readCapacity,
                                   long logId,
-                                  File logFile) throws IOException {
-            super(fc, writeCapacity, readCapacity);
+                                  File logFile,
+                                  long unpersistedBytesBound) throws IOException {
+            super(fc, writeCapacity, readCapacity, unpersistedBytesBound);
             this.logId = logId;
             this.entryLogMetadata = new EntryLogMetadata(logId);
             this.logFile = logFile;
@@ -197,9 +198,6 @@ public class EntryLogger {
     static final int MIN_SANE_ENTRY_SIZE = 8 + 8;
     static final long MB = 1024 * 1024;
 
-    private final long flushIntervalInBytes;
-    private final boolean doRegularFlushes;
-    private long bytesWrittenSinceLastFlush = 0;
     private final int maxSaneEntrySize;
 
     final ServerConfiguration conf;
@@ -287,8 +285,6 @@ public class EntryLogger {
         this.leastUnflushedLogId = logId + 1;
         this.entryLoggerAllocator = new EntryLoggerAllocator(logId);
         this.conf = conf;
-        flushIntervalInBytes = conf.getFlushIntervalInBytes();
-        doRegularFlushes = flushIntervalInBytes > 0;
 
         initialize();
     }
@@ -481,7 +477,7 @@ public class EntryLogger {
 
             // flush the internal buffer back to filesystem but not sync disk
             // so the readers could access the data from filesystem.
-            logChannel.flush(false);
+            logChannel.flush();
 
             // Append ledgers map at the end of entry log
             appendLedgersMap(logChannel);
@@ -570,7 +566,7 @@ public class EntryLogger {
         }
         // Flush the ledger's map out before we write the header.
         // Otherwise the header might point to something that is not fully written
-        entryLogChannel.flush(false);
+        entryLogChannel.flush();
 
         // Update the headers with the map offset and count of ledgers
         ByteBuffer mapInfo = ByteBuffer.allocate(8 + 4);
@@ -668,8 +664,8 @@ public class EntryLogger {
             } while (newLogFile == null);
 
             FileChannel channel = new RandomAccessFile(newLogFile, "rw").getChannel();
-            BufferedLogChannel logChannel = new BufferedLogChannel(channel,
-                    conf.getWriteBufferBytes(), conf.getReadBufferBytes(), preallocatedLogId, newLogFile);
+            BufferedLogChannel logChannel = new BufferedLogChannel(channel, conf.getWriteBufferBytes(),
+                    conf.getReadBufferBytes(), preallocatedLogId, newLogFile, conf.getFlushIntervalInBytes());
             logfileHeader.readerIndex(0);
             logChannel.write(logfileHeader);
 
@@ -817,7 +813,7 @@ public class EntryLogger {
         while (chIter.hasNext()) {
             BufferedLogChannel channel = chIter.next();
             try {
-                channel.flush(true);
+                channel.flushAndForceWrite(false);
             } catch (IOException ioe) {
                 // rescue from flush exception, add unflushed channels back
                 synchronized (this) {
@@ -851,8 +847,7 @@ public class EntryLogger {
 
     synchronized void flushCurrentLog() throws IOException {
         if (logChannel != null) {
-            logChannel.flush(true);
-            bytesWrittenSinceLastFlush = 0;
+            logChannel.flushAndForceWrite(false);
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Flush and sync current entry logger {}.", logChannel.getLogId());
             }
@@ -881,9 +876,6 @@ public class EntryLogger {
         // Create new log if logSizeLimit reached or current disk is full
         boolean createNewLog = shouldCreateNewEntryLog.get();
         if (createNewLog || reachEntryLogLimit) {
-            if (doRegularFlushes) {
-                flushCurrentLog();
-            }
             createNewLog();
             // Reset the flag
             if (createNewLog) {
@@ -901,8 +893,6 @@ public class EntryLogger {
         logChannel.write(entry);
         logChannel.registerWrittenEntry(ledger, entrySize);
 
-        incrementBytesWrittenAndMaybeFlush(4L + entrySize);
-
         return (logChannel.getLogId() << 32L) | pos;
     }
 
@@ -928,7 +918,7 @@ public class EntryLogger {
     void flushCompactionLog() throws IOException {
         synchronized (compactionLogLock) {
             if (compactionLogChannel != null) {
-                compactionLogChannel.flush(true);
+                compactionLogChannel.flushAndForceWrite(false);
                 LOG.info("Flushed compaction log file {} with logId.",
                     compactionLogChannel.getLogFile(),
                     compactionLogChannel.getLogId());
@@ -970,16 +960,6 @@ public class EntryLogger {
         }
     }
 
-    private void incrementBytesWrittenAndMaybeFlush(long bytesWritten) throws IOException {
-        if (!doRegularFlushes) {
-            return;
-        }
-        bytesWrittenSinceLastFlush += bytesWritten;
-        if (bytesWrittenSinceLastFlush > flushIntervalInBytes) {
-            flushCurrentLog();
-        }
-    }
-
     static long logIdForOffset(long offset) {
         return offset >> 32L;
     }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 146a83e..775219b 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -986,7 +986,7 @@ public class Journal extends BookieCriticalThread implements CheckpointSource {
                                 writePaddingBytes(logFile, paddingBuff, journalAlignmentSize);
                             }
                             journalFlushWatcher.reset().start();
-                            bc.flush(false);
+                            bc.flush();
 
                             for (int i = 0; i < toFlush.size(); i++) {
                                 QueueEntry entry = toFlush.get(i);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 9294e27..5133e78 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -171,7 +171,7 @@ public class BookieJournalTest {
 
     private static void moveToPosition(JournalChannel jc, long pos) throws IOException {
         jc.fc.position(pos);
-        jc.bc.position = pos;
+        jc.bc.position.set(pos);
         jc.bc.writeBufferStartPosition.set(pos);
     }
 
@@ -211,7 +211,7 @@ public class BookieJournalTest {
             bc.write(packet);
             packet.release();
         }
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
 
         updateJournalVersion(jc, JournalChannel.V2);
 
@@ -245,7 +245,7 @@ public class BookieJournalTest {
             bc.write(packet);
             packet.release();
         }
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
 
         updateJournalVersion(jc, JournalChannel.V3);
 
@@ -284,7 +284,7 @@ public class BookieJournalTest {
         lenBuf.writeInt(packet.readableBytes());
         bc.write(lenBuf);
         bc.write(packet);
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
         updateJournalVersion(jc, JournalChannel.V4);
         return jc;
     }
@@ -324,7 +324,7 @@ public class BookieJournalTest {
         bc.write(lenBuf);
         bc.write(packet);
         Journal.writePaddingBytes(jc, paddingBuff, JournalChannel.SECTOR_SIZE);
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
         updateJournalVersion(jc, JournalChannel.V5);
         return jc;
     }
@@ -521,7 +521,7 @@ public class BookieJournalTest {
 
         JournalChannel jc = writeV2Journal(Bookie.getCurrentDirectory(journalDir), 0);
         jc.getBufferedChannel().write(Unpooled.wrappedBuffer("JunkJunkJunk".getBytes()));
-        jc.getBufferedChannel().flush(true);
+        jc.getBufferedChannel().flushAndForceWrite(false);
 
         writeIndexFileForLedger(ledgerDir, 1, "testPasswd".getBytes());
 
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
new file mode 100644
index 0000000..86f3a86
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BufferedChannelTest.java
@@ -0,0 +1,134 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for BufferedChannel.
+ */
+public class BufferedChannelTest {
+
+    private static Random rand = new Random();
+    private static final int INTERNAL_BUFFER_WRITE_CAPACITY = 65536;
+    private static final int INTERNAL_BUFFER_READ_CAPACITY = 512;
+
+    @Test
+    public void testBufferedChannelWithNoBoundOnUnpersistedBytes() throws Exception {
+        testBufferedChannel(5000, 30, 0, false, false);
+    }
+
+    @Test
+    public void testBufferedChannelWithBoundOnUnpersistedBytes() throws Exception {
+        testBufferedChannel(5000, 30, 5000 * 28, false, false);
+    }
+
+    @Test
+    public void testBufferedChannelWithBoundOnUnpersistedBytesAndFlush() throws Exception {
+        testBufferedChannel(5000, 30, 5000 * 28, true, false);
+    }
+
+    @Test
+    public void testBufferedChannelFlushNoForceWrite() throws Exception {
+        testBufferedChannel(5000, 30, 0, true, false);
+    }
+
+    @Test
+    public void testBufferedChannelForceWriteNoFlush() throws Exception {
+        testBufferedChannel(5000, 30, 0, false, true);
+    }
+
+    @Test
+    public void testBufferedChannelFlushForceWrite() throws Exception {
+        testBufferedChannel(5000, 30, 0, true, true);
+    }
+
+    public void testBufferedChannel(int byteBufLength, int numOfWrites, int unpersistedBytesBound, boolean flush,
+            boolean shouldForceWrite) throws Exception {
+        File newLogFile = File.createTempFile("test", "log");
+        newLogFile.deleteOnExit();
+        FileChannel fileChannel = new RandomAccessFile(newLogFile, "rw").getChannel();
+
+        BufferedChannel logChannel = new BufferedChannel(fileChannel, INTERNAL_BUFFER_WRITE_CAPACITY,
+                INTERNAL_BUFFER_READ_CAPACITY, unpersistedBytesBound);
+
+        ByteBuf dataBuf = generateEntry(byteBufLength);
+        dataBuf.markReaderIndex();
+        dataBuf.markWriterIndex();
+
+        for (int i = 0; i < numOfWrites; i++) {
+            logChannel.write(dataBuf);
+            dataBuf.resetReaderIndex();
+            dataBuf.resetWriterIndex();
+        }
+
+        if (flush && shouldForceWrite) {
+            logChannel.flushAndForceWrite(false);
+        } else if (flush) {
+            logChannel.flush();
+        } else if (shouldForceWrite) {
+            logChannel.forceWrite(false);
+        }
+
+        int expectedNumOfUnpersistedBytes = 0;
+
+        if (flush && shouldForceWrite) {
+            /*
+             * if flush call is made with shouldForceWrite,
+             * then expectedNumOfUnpersistedBytes should be zero.
+             */
+            expectedNumOfUnpersistedBytes = 0;
+        } else if (!flush && shouldForceWrite) {
+            /*
+             * if flush is not called then internal write buffer is not flushed,
+             * but while adding entries to BufferedChannel if writeBuffer has
+             * reached its capacity then it will call flush method, and the data
+             * gets added to the file buffer. So though explicitly we are not
+             * calling flush method, implicitly flush gets called when
+             * writeBuffer reaches its capacity.
+             */
+            expectedNumOfUnpersistedBytes = (byteBufLength * numOfWrites) % INTERNAL_BUFFER_WRITE_CAPACITY;
+        } else {
+            expectedNumOfUnpersistedBytes = (byteBufLength * numOfWrites) - unpersistedBytesBound;
+        }
+
+        Assert.assertEquals("Unpersisted bytes", expectedNumOfUnpersistedBytes, logChannel.getUnpersistedBytes());
+        logChannel.close();
+        fileChannel.close();
+    }
+
+    private static ByteBuf generateEntry(int length) {
+        byte[] data = new byte[length];
+        ByteBuf bb = Unpooled.buffer(length);
+        rand.nextBytes(data);
+        bb.writeBytes(data);
+        return bb;
+    }
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
index 77f2453..b89a5b0 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/UpgradeTest.java
@@ -105,7 +105,7 @@ public class UpgradeTest extends BookKeeperClusterTestCase {
             bc.write(packet);
             packet.release();
         }
-        bc.flush(true);
+        bc.flushAndForceWrite(false);
 
         return jc;
     }

-- 
To stop receiving notification emails like this one, please contact
ivank@apache.org.