You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/04/27 23:34:00 UTC

[GitHub] sijie closed pull request #1358: Issue #570: Introduce EntryMemTableWithParallelFlusher

sijie closed pull request #1358: Issue #570: Introduce EntryMemTableWithParallelFlusher
URL: https://github.com/apache/bookkeeper/pull/1358
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
index 50e93a3dc..70f437cb3 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTable.java
@@ -48,7 +48,7 @@
  * We continue to serve edits out of new EntrySkipList and backing snapshot until
  * flusher reports in that the flush succeeded. At that point we let the snapshot go.
  */
-public class EntryMemTable {
+public class EntryMemTable implements AutoCloseable{
     private static Logger logger = LoggerFactory.getLogger(Journal.class);
 
     /**
@@ -117,7 +117,7 @@ private EntrySkipList newSkipList() {
     private final OpStatsLogger snapshotStats;
     private final OpStatsLogger putEntryStats;
     private final OpStatsLogger getEntryStats;
-    private final Counter flushBytesCounter;
+    final Counter flushBytesCounter;
     private final Counter throttlingCounter;
 
     /**
@@ -232,10 +232,14 @@ public long flush(SkipListFlusher flusher, Checkpoint checkpoint) throws IOExcep
     }
 
     /**
-     * Flush snapshot and clear it iff its data is before checkpoint.
-     * Only this function change non-empty this.snapshot.
+     * Flush snapshot and clear it iff its data is before checkpoint. Only this
+     * function change non-empty this.snapshot.
+     *
+     * <p>EntryMemTableWithParallelFlusher overrides this flushSnapshot method. So
+     * any change in functionality/behavior/characteristic of this method should
+     * also reflect in EntryMemTableWithParallelFlusher's flushSnapshot method.
      */
-    private long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+    long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
         long size = 0;
         if (this.snapshot.compareTo(checkpoint) < 0) {
             long ledger, ledgerGC = -1;
@@ -268,7 +272,7 @@ private long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint)
      * @param keyValues The snapshot to clean out.
      * @see {@link #snapshot()}
      */
-    private void clearSnapshot(final EntrySkipList keyValues) {
+    void clearSnapshot(final EntrySkipList keyValues) {
         // Caller makes sure that keyValues not empty
         assert !keyValues.isEmpty();
         this.lock.writeLock().lock();
@@ -452,4 +456,9 @@ boolean isSizeLimitReached() {
     boolean isEmpty() {
         return size.get() == 0 && snapshot.isEmpty();
     }
+
+    @Override
+    public void close() throws Exception {
+        // no-op
+    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
new file mode 100644
index 000000000..a3849e915
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/EntryMemTableWithParallelFlusher.java
@@ -0,0 +1,154 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.OrderedExecutor;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.SafeRunnable;
+
+/**
+ * EntryMemTableWithParallelFlusher.
+ */
+@Slf4j
+class EntryMemTableWithParallelFlusher extends EntryMemTable {
+
+    final OrderedExecutor flushExecutor;
+
+    public EntryMemTableWithParallelFlusher(final ServerConfiguration conf, final CheckpointSource source,
+            final StatsLogger statsLogger) {
+        super(conf, source, statsLogger);
+        this.flushExecutor = OrderedExecutor.newBuilder().numThreads(conf.getNumOfMemtableFlushThreads())
+                .name("MemtableFlushThreads").build();
+    }
+
+    /**
+     * Functionally this overridden flushSnapshot does the same as
+     * EntryMemTable's flushSnapshot, but it uses flushExecutor
+     * (OrderedExecutor) to process an entry through flusher.
+     *
+     * <p>SubMaps of the snapshot corresponding to the entries of the ledgers are
+     * created and submitted to the flushExecutor with ledgerId as the
+     * orderingKey to flush process the entries of a ledger.
+     */
+    @Override
+    long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+        AtomicLong flushedSize = new AtomicLong();
+        if (this.snapshot.compareTo(checkpoint) < 0) {
+            synchronized (this) {
+                EntrySkipList keyValues = this.snapshot;
+
+                Phaser pendingNumOfLedgerFlushes = new Phaser(1);
+                AtomicReference<Exception> exceptionWhileFlushingParallelly = new AtomicReference<Exception>();
+
+                if (keyValues.compareTo(checkpoint) < 0) {
+
+                    Map.Entry<EntryKey, EntryKeyValue> thisLedgerFirstMapEntry = keyValues.firstEntry();
+                    EntryKeyValue thisLedgerFirstEntry;
+                    long thisLedgerId;
+
+                    while (thisLedgerFirstMapEntry != null) {
+                        thisLedgerFirstEntry = thisLedgerFirstMapEntry.getValue();
+                        thisLedgerId = thisLedgerFirstEntry.getLedgerId();
+                        EntryKey thisLedgerCeilingKeyMarker = new EntryKey(thisLedgerId, Long.MAX_VALUE - 1);
+                        /*
+                         * Gets a view of the portion of this map that
+                         * corresponds to entries of this ledger.
+                         */
+                        ConcurrentNavigableMap<EntryKey, EntryKeyValue> thisLedgerEntries = keyValues
+                                .subMap(thisLedgerFirstEntry, thisLedgerCeilingKeyMarker);
+                        pendingNumOfLedgerFlushes.register();
+                        flushExecutor.executeOrdered(thisLedgerId, new SafeRunnable() {
+                            @Override
+                            public void safeRun() {
+                                try {
+                                    long ledger;
+                                    boolean ledgerDeleted = false;
+                                    for (EntryKey key : thisLedgerEntries.keySet()) {
+                                        EntryKeyValue kv = (EntryKeyValue) key;
+                                        flushedSize.addAndGet(kv.getLength());
+                                        ledger = kv.getLedgerId();
+                                        if (!ledgerDeleted) {
+                                            try {
+                                                flusher.process(ledger, kv.getEntryId(), kv.getValueAsByteBuffer());
+                                            } catch (NoLedgerException exception) {
+                                                ledgerDeleted = true;
+                                            }
+                                        }
+                                    }
+                                    pendingNumOfLedgerFlushes.arriveAndDeregister();
+                                } catch (Exception exc) {
+                                    log.error("Got Exception while trying to flush process entryies: ", exc);
+                                    exceptionWhileFlushingParallelly.set(exc);
+                                    /*
+                                     * if we get any unexpected exception while
+                                     * trying to flush process entries of a
+                                     * ledger, then terminate the
+                                     * pendingNumOfLedgerFlushes phaser.
+                                     */
+                                    pendingNumOfLedgerFlushes.forceTermination();
+                                }
+                            }
+                        });
+                        thisLedgerFirstMapEntry = keyValues.ceilingEntry(thisLedgerCeilingKeyMarker);
+                    }
+
+                    boolean phaserTerminatedAbruptly = false;
+                    try {
+                        /*
+                         * while flush processing entries of a ledger if it
+                         * failed because of any unexpected exception then
+                         * pendingNumOfLedgerFlushes phaser would be force
+                         * terminated and because of that arriveAndAwaitAdvance
+                         * would be a negative value.
+                         */
+                        phaserTerminatedAbruptly = (pendingNumOfLedgerFlushes.arriveAndAwaitAdvance() < 0);
+                    } catch (IllegalStateException ise) {
+                        log.error("Got IllegalStateException while awaiting on Phaser", ise);
+                        throw new IOException("Got IllegalStateException while awaiting on Phaser", ise);
+                    }
+                    if (phaserTerminatedAbruptly) {
+                        log.error("Phaser is terminated while awaiting flushExecutor to complete the entry flushes",
+                                exceptionWhileFlushingParallelly.get());
+                        throw new IOException("Failed to complete the flushSnapshotByParallelizing",
+                                exceptionWhileFlushingParallelly.get());
+                    }
+                    flushBytesCounter.add(flushedSize.get());
+                    clearSnapshot(keyValues);
+                }
+            }
+        }
+        return flushedSize.longValue();
+    }
+
+    @Override
+    public void close() throws Exception {
+        flushExecutor.shutdown();
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
index f2efa551a..815c65e18 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/SortedLedgerStorage.java
@@ -72,7 +72,11 @@ public void initialize(ServerConfiguration conf,
             checkpointSource,
             checkpointer,
             statsLogger);
-        this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
+        if (conf.isEntryLogPerLedgerEnabled()) {
+            this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger);
+        } else {
+            this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger);
+        }
         this.scheduler = Executors.newSingleThreadScheduledExecutor(
                 new ThreadFactoryBuilder()
                 .setNameFormat("SortedLedgerStorage-%d")
@@ -102,6 +106,11 @@ public void shutdown() throws InterruptedException {
         if (!scheduler.awaitTermination(3, TimeUnit.SECONDS)) {
             scheduler.shutdownNow();
         }
+        try {
+            memTable.close();
+        } catch (Exception e) {
+            LOG.error("Error while closing the memtable", e);
+        }
         super.shutdown();
     }
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 785b35a75..bdc26e767 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -185,6 +185,9 @@
      * config specifying if the entrylog per ledger is enabled or not.
      */
     protected static final String ENTRY_LOG_PER_LEDGER_ENABLED = "entryLogPerLedgerEnabled";
+    // In the case of multipleentrylogs, multiple threads can be used to flush the memtable parallelly.
+    protected static final String NUMBER_OF_MEMTABLE_FLUSH_THREADS = "numOfMemtableFlushThreads";
+
 
     /**
      * Construct a default configuration object.
@@ -2723,4 +2726,22 @@ public ServerConfiguration setEntryLogPerLedgerEnabled(boolean entryLogPerLedger
         this.setProperty(ENTRY_LOG_PER_LEDGER_ENABLED, Boolean.toString(entryLogPerLedgerEnabled));
         return this;
     }
+
+    /*
+     * In the case of multipleentrylogs, multiple threads can be used to flush the memtable.
+     *
+     * Gets the number of threads used to flush entrymemtable
+     */
+    public int getNumOfMemtableFlushThreads() {
+        return this.getInt(NUMBER_OF_MEMTABLE_FLUSH_THREADS, 8);
+    }
+
+    /*
+     * Sets the number of threads used to flush entrymemtable, in the case of multiple entrylogs
+     *
+     */
+    public ServerConfiguration setNumOfMemtableFlushThreads(int numOfMemtableFlushThreads) {
+        this.setProperty(NUMBER_OF_MEMTABLE_FLUSH_THREADS, Integer.toString(numOfMemtableFlushThreads));
+        return this;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
index 21da95118..ed8b43d38 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/EntryLogTest.java
@@ -510,20 +510,64 @@ public Boolean call() throws IOException {
     }
 
     /**
-     * test concurrent write operations and then concurrent read
-     * operations using InterleavedLedgerStorage.
+     * test concurrent write operations and then concurrent read operations
+     * using InterleavedLedgerStorage.
      */
     @Test
     public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorage() throws Exception {
+        testConcurrentWriteAndReadCalls(InterleavedLedgerStorage.class.getName(), false);
+    }
+
+    /**
+     * test concurrent write operations and then concurrent read operations
+     * using InterleavedLedgerStorage with EntryLogPerLedger enabled.
+     */
+    @Test
+    public void testConcurrentWriteAndReadCallsOfInterleavedLedgerStorageWithELPLEnabled() throws Exception {
+        testConcurrentWriteAndReadCalls(InterleavedLedgerStorage.class.getName(), true);
+    }
+
+    /**
+     * test concurrent write operations and then concurrent read operations
+     * using SortedLedgerStorage.
+     */
+    @Test
+    public void testConcurrentWriteAndReadCallsOfSortedLedgerStorage() throws Exception {
+        testConcurrentWriteAndReadCalls(SortedLedgerStorage.class.getName(), false);
+    }
+
+    /**
+     * test concurrent write operations and then concurrent read operations
+     * using SortedLedgerStorage with EntryLogPerLedger enabled.
+     */
+    @Test
+    public void testConcurrentWriteAndReadCallsOfSortedLedgerStorageWithELPLEnabled() throws Exception {
+        testConcurrentWriteAndReadCalls(SortedLedgerStorage.class.getName(), true);
+    }
+
+    public void testConcurrentWriteAndReadCalls(String ledgerStorageClass, boolean entryLogPerLedgerEnabled)
+            throws Exception {
         File ledgerDir = createTempDir("bkTest", ".dir");
         ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
         conf.setJournalDirName(ledgerDir.toString());
         conf.setLedgerDirNames(new String[] { ledgerDir.getAbsolutePath()});
-        conf.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
+        conf.setLedgerStorageClass(ledgerStorageClass);
+        conf.setEntryLogPerLedgerEnabled(entryLogPerLedgerEnabled);
         Bookie bookie = new Bookie(conf);
         InterleavedLedgerStorage ledgerStorage = ((InterleavedLedgerStorage) bookie.ledgerStorage);
         Random rand = new Random(0);
 
+        if (ledgerStorageClass.equals(SortedLedgerStorage.class.getName())) {
+            Assert.assertEquals("LedgerStorage Class", SortedLedgerStorage.class, ledgerStorage.getClass());
+            if (entryLogPerLedgerEnabled) {
+                Assert.assertEquals("MemTable Class", EntryMemTableWithParallelFlusher.class,
+                        ((SortedLedgerStorage) ledgerStorage).memTable.getClass());
+            } else {
+                Assert.assertEquals("MemTable Class", EntryMemTable.class,
+                        ((SortedLedgerStorage) ledgerStorage).memTable.getClass());
+            }
+        }
+
         int numOfLedgers = 70;
         int numEntries = 1500;
         // Create ledgers
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
index 1c512093e..ae3b4cdcc 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java
@@ -37,7 +37,10 @@
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.FileInfoBackingCache.CachedFileInfo;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -451,19 +454,32 @@ public void run() {
     static class FlushTestSortedLedgerStorage extends SortedLedgerStorage {
         final AtomicBoolean injectMemTableSizeLimitReached;
         final AtomicBoolean injectFlushException;
+        final AtomicLong injectFlushExceptionForLedger;
+        final AtomicInteger numOfTimesFlushSnapshotCalled = new AtomicInteger(0);
+        static final long FORALLLEDGERS = -1;
 
         public FlushTestSortedLedgerStorage() {
             super();
             injectMemTableSizeLimitReached = new AtomicBoolean();
             injectFlushException = new AtomicBoolean();
+            injectFlushExceptionForLedger = new AtomicLong(FORALLLEDGERS);
         }
 
         public void setInjectMemTableSizeLimitReached(boolean setValue) {
             injectMemTableSizeLimitReached.set(setValue);
         }
 
-        public void setInjectFlushException(boolean setValue) {
+        public void setInjectFlushException(boolean setValue, long ledgerId) {
             injectFlushException.set(setValue);
+            injectFlushExceptionForLedger.set(ledgerId);
+        }
+
+        public void incrementNumOfTimesFlushSnapshotCalled() {
+            numOfTimesFlushSnapshotCalled.incrementAndGet();
+        }
+
+        public int getNumOfTimesFlushSnapshotCalled() {
+            return numOfTimesFlushSnapshotCalled.get();
         }
 
         @Override
@@ -484,21 +500,44 @@ public void initialize(ServerConfiguration conf,
                 checkpointSource,
                 checkpointer,
                 statsLogger);
-            this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger) {
-                @Override
-                boolean isSizeLimitReached() {
-                    return (injectMemTableSizeLimitReached.get() || super.isSizeLimitReached());
-                }
-            };
+            if (this.memTable instanceof EntryMemTableWithParallelFlusher) {
+                this.memTable = new EntryMemTableWithParallelFlusher(conf, checkpointSource, statsLogger) {
+                    @Override
+                    boolean isSizeLimitReached() {
+                        return (injectMemTableSizeLimitReached.get() || super.isSizeLimitReached());
+                    }
+
+                    @Override
+                    long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+                        incrementNumOfTimesFlushSnapshotCalled();
+                        return super.flushSnapshot(flusher, checkpoint);
+                    }
+                };
+            } else {
+                this.memTable = new EntryMemTable(conf, checkpointSource, statsLogger) {
+                    @Override
+                    boolean isSizeLimitReached() {
+                        return (injectMemTableSizeLimitReached.get() || super.isSizeLimitReached());
+                    }
+
+                    @Override
+                    long flushSnapshot(final SkipListFlusher flusher, Checkpoint checkpoint) throws IOException {
+                        incrementNumOfTimesFlushSnapshotCalled();
+                        return super.flushSnapshot(flusher, checkpoint);
+                    }
+                };
+            }
         }
 
         @Override
         public void process(long ledgerId, long entryId, ByteBuf buffer) throws IOException {
-            if (injectFlushException.get()) {
+            if (injectFlushException.get() && ((injectFlushExceptionForLedger.get() == FORALLLEDGERS)
+                    || (injectFlushExceptionForLedger.get() == ledgerId))) {
                 throw new IOException("Injected Exception");
             }
             super.process(ledgerId, entryId, buffer);
         }
+
         // simplified memTable full callback.
         @Override
         public void onSizeLimitReached(final CheckpointSource.Checkpoint cp) throws IOException {
@@ -542,19 +581,18 @@ public void testEntryMemTableFlushFailure() throws Exception {
 
         // set flags, so that FlushTestSortedLedgerStorage simulates FlushFailure scenario
         flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
-        flushTestSortedLedgerStorage.setInjectFlushException(true);
+        flushTestSortedLedgerStorage.setInjectFlushException(true, FlushTestSortedLedgerStorage.FORALLLEDGERS);
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
-        Thread.sleep(1000);
 
         // since we simulated sizeLimitReached, snapshot shouldn't be empty
         assertFalse("EntryMemTable SnapShot is not expected to be empty", memTable.snapshot.isEmpty());
+        assertEquals("Flusher called", 1, flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
 
         // set the flags to false, so flush will succeed this time
         flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(false);
-        flushTestSortedLedgerStorage.setInjectFlushException(false);
+        flushTestSortedLedgerStorage.setInjectFlushException(false, FlushTestSortedLedgerStorage.FORALLLEDGERS);
 
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 3));
-        Thread.sleep(1000);
         // since we expect memtable flush to succeed, memtable snapshot should be empty
         assertTrue("EntryMemTable SnapShot is expected to be empty, because of successful flush",
                 memTable.snapshot.isEmpty());
@@ -586,7 +624,7 @@ public void testSortedLedgerFlushFailure() throws Exception {
 
         // set flags, so that FlushTestSortedLedgerStorage simulates FlushFailure scenario
         flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
-        flushTestSortedLedgerStorage.setInjectFlushException(true);
+        flushTestSortedLedgerStorage.setInjectFlushException(true, FlushTestSortedLedgerStorage.FORALLLEDGERS);
         flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
 
         // since we simulated sizeLimitReached, snapshot shouldn't be empty
@@ -613,4 +651,104 @@ private ByteBuf generateEntry(long ledger, long entry) {
         bb.writeBytes(data);
         return bb;
     }
+
+    @Test
+    public void testEntryMemTableParallelFlush() throws Exception {
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(1));
+        conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+        // enable entrylog per ledger
+        conf.setEntryLogPerLedgerEnabled(true);
+
+        Bookie bookie = new Bookie(conf);
+        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage) bookie.ledgerStorage;
+        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+        /*
+         * this bookie.addEntry call is required. FileInfo for Ledger 1, 2, 3
+         * would be created with this call. without the fileinfo,
+         * 'flushTestSortedLedgerStorage.addEntry' calls will fail because of
+         * BOOKKEEPER-965 change.
+         */
+        bookie.addEntry(generateEntry(1, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(2, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(3, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 2));
+        flushTestSortedLedgerStorage.addEntry(generateEntry(2, 2));
+        flushTestSortedLedgerStorage.addEntry(generateEntry(3, 2));
+
+        assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
+        assertFalse("EntryMemTable is not expected to be empty", memTable.isEmpty());
+
+        // inject MemTableSizeLimitReached, so entrymemtable will be flushed
+        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 3));
+
+        // since we simulated sizeLimitReached, snapshot should have been created and flushed
+        assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
+        assertEquals("Flusher called", 1, flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
+    }
+
+    @Test
+    public void testEntryMemTableParallelFlushWithFlushException() throws Exception {
+        int gcWaitTime = 1000;
+        ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setLedgerDirNames(createAndGetLedgerDirs(1));
+        conf.setLedgerStorageClass(FlushTestSortedLedgerStorage.class.getName());
+        // enable entrylog per ledger
+        conf.setEntryLogPerLedgerEnabled(true);
+
+        Bookie bookie = new Bookie(conf);
+        FlushTestSortedLedgerStorage flushTestSortedLedgerStorage = (FlushTestSortedLedgerStorage) bookie.ledgerStorage;
+        EntryMemTable memTable = flushTestSortedLedgerStorage.memTable;
+
+        /*
+         * this bookie.addEntry call is required. FileInfo for Ledger 1, 2, 3
+         * would be created with this call. without the fileinfo,
+         * 'flushTestSortedLedgerStorage.addEntry' calls will fail because of
+         * BOOKKEEPER-965 change.
+         */
+        bookie.addEntry(generateEntry(1, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(2, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+        bookie.addEntry(generateEntry(3, 1), false, new Bookie.NopWriteCallback(), null, "passwd".getBytes());
+
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 4));
+        flushTestSortedLedgerStorage.addEntry(generateEntry(2, 4));
+        flushTestSortedLedgerStorage.addEntry(generateEntry(3, 4));
+
+        // inject MemTableSizeLimitReached and FlushException, so entrymemtable flush will fail
+        flushTestSortedLedgerStorage.setInjectMemTableSizeLimitReached(true);
+        flushTestSortedLedgerStorage.setInjectFlushException(true, 1L);
+
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 5));
+        // since we simulate FlushException, memtable snapshot should not be empty
+        assertFalse("EntryMemTable SnapShot is not expected to be empty", memTable.snapshot.isEmpty());
+        assertEquals("Flusher called", 1, flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
+
+        flushTestSortedLedgerStorage.setInjectFlushException(false, FlushTestSortedLedgerStorage.FORALLLEDGERS);
+        flushTestSortedLedgerStorage.addEntry(generateEntry(1, 5));
+        /*
+         * since MemTableSizeLimitReached is already set to true, and flush
+         * exception is disabled, this time memtable snapshot should be flushed
+         */
+        assertTrue("EntryMemTable SnapShot is expected to be empty", memTable.snapshot.isEmpty());
+        assertEquals("Flusher called", 2, flushTestSortedLedgerStorage.getNumOfTimesFlushSnapshotCalled());
+    }
+
+    String[] createAndGetLedgerDirs(int numOfLedgerDirs) throws IOException {
+        File ledgerDir;
+        File curDir;
+        String[] ledgerDirsPath = new String[numOfLedgerDirs];
+        for (int i = 0; i < numOfLedgerDirs; i++) {
+            ledgerDir = createTempDir("bkTest", ".dir");
+            curDir = Bookie.getCurrentDirectory(ledgerDir);
+            Bookie.checkDirectoryStructure(curDir);
+            ledgerDirsPath[i] = ledgerDir.getAbsolutePath();
+        }
+        return ledgerDirsPath;
+    }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
index 50844f3d5..68e3eeba9 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestEntryMemTable.java
@@ -27,25 +27,46 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
+import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
 /**
  * Test the EntryMemTable class.
  */
+@RunWith(Parameterized.class)
 public class TestEntryMemTable implements CacheCallback, SkipListFlusher, CheckpointSource {
 
+    private Class entryMemTableClass;
     private EntryMemTable memTable;
     private final Random random = new Random();
     private TestCheckPoint curCheckpoint = new TestCheckPoint(0, 0);
 
+    @Parameters
+    public static Collection<Object[]> memTableClass() {
+        return Arrays.asList(new Object[][] { { EntryMemTable.class }, { EntryMemTableWithParallelFlusher.class } });
+    }
+
+    public TestEntryMemTable(Class entryMemTableClass) {
+        this.entryMemTableClass = entryMemTableClass;
+    }
+
     @Override
     public Checkpoint newCheckpoint() {
         return curCheckpoint;
@@ -58,8 +79,18 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact)
 
     @Before
     public void setUp() throws Exception {
-        this.memTable = new EntryMemTable(TestBKConfiguration.newServerConfiguration(),
-                this, NullStatsLogger.INSTANCE);
+        if (entryMemTableClass.equals(EntryMemTableWithParallelFlusher.class)) {
+            ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
+            this.memTable = new EntryMemTableWithParallelFlusher(conf, this, NullStatsLogger.INSTANCE);
+        } else {
+            this.memTable = new EntryMemTable(TestBKConfiguration.newServerConfiguration(), this,
+                    NullStatsLogger.INSTANCE);
+        }
+    }
+
+    @After
+    public void cleanup() throws Exception{
+        this.memTable.close();
     }
 
     @Test
@@ -134,9 +165,9 @@ public void testScanAcrossSnapshot() throws IOException {
     }
 
     private class KVFLusher implements SkipListFlusher {
-        final HashSet<EntryKeyValue> keyValues;
+        final Set<EntryKeyValue> keyValues;
 
-        KVFLusher(final HashSet<EntryKeyValue> keyValues) {
+        KVFLusher(final Set<EntryKeyValue> keyValues) {
             this.keyValues = keyValues;
         }
 
@@ -160,7 +191,7 @@ public void process(long ledgerId, long entryId, ByteBuf entry) throws IOExcepti
      */
     @Test
     public void testFlushLogMark() throws IOException {
-        HashSet<EntryKeyValue> flushedKVs = new HashSet<EntryKeyValue>();
+        Set<EntryKeyValue> flushedKVs = Collections.newSetFromMap(new ConcurrentHashMap<EntryKeyValue, Boolean>());
         KVFLusher flusher = new KVFLusher(flushedKVs);
 
         curCheckpoint.setCheckPoint(2, 2);
@@ -195,7 +226,7 @@ public void testFlushLogMark() throws IOException {
     @Test
     public void testFlushSnapshot() throws IOException {
         HashSet<EntryKeyValue> keyValues = new HashSet<EntryKeyValue>();
-        HashSet<EntryKeyValue> flushedKVs = new HashSet<EntryKeyValue>();
+        Set<EntryKeyValue> flushedKVs = Collections.newSetFromMap(new ConcurrentHashMap<EntryKeyValue, Boolean>());
         KVFLusher flusher = new KVFLusher(flushedKVs);
 
         byte[] data = new byte[10];
diff --git a/conf/bk_server.conf b/conf/bk_server.conf
index 946e6af58..ad73a01ef 100755
--- a/conf/bk_server.conf
+++ b/conf/bk_server.conf
@@ -420,6 +420,9 @@ ledgerDirectories=/tmp/bk-data
 # number of active ledgers are written to a bookie.
 # entryLogPerLedgerEnabled=false
 
+# In the case of multipleentrylogs, multiple threads can be used to flush the memtable
+# numOfMemtableFlushThreads=8
+
 #############################################################################
 ## Entry log compaction settings
 #############################################################################
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index fd3ab7ad1..481d82826 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -409,6 +409,10 @@ groups:
     description: |
       How many index pages provided in ledger cache. If number of index pages reaches this limitation, bookie server starts to swap some ledgers from memory to disk. You can increment this value when you found swap became more frequent. But make sure pageLimit*pageSize should not more than JVM max memory limitation, otherwise you would got OutOfMemoryException. In general, incrementing pageLimit, using smaller index page would gain bettern performance in lager number of ledgers with fewer entries case. If pageLimit is -1, bookie server will use 1/3 of JVM memory to compute the limitation of number of index pages.
     default: "-1"
+  - param: numOfMemtableFlushThreads
+    description: |
+      When entryLogPerLedger is enabled SortedLedgerStorage flushes entries from memTable using OrderedExecutor having numOfMemtableFlushThreads number of threads.
+    default: 8
 
 - name: DB Ledger Storage Settings
   params:


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services