You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2019/07/12 08:04:51 UTC

[bookkeeper] branch master updated: Issue #2103: Avoid stop of entry log compaction

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 1adc93f  Issue #2103: Avoid stop of entry log compaction
1adc93f is described below

commit 1adc93f05954fdb06d7e8b548d606e281831924a
Author: massakam <ma...@yahoo-corp.jp>
AuthorDate: Fri Jul 12 17:04:46 2019 +0900

    Issue #2103: Avoid stop of entry log compaction
    
    ### Motivation
    
    As mentioned in #2103, if an exception occurs during compaction of a specific entry log, `GarbageCollectorThread` does not perform compaction of other entry logs until the bookie server is restarted. As a result, the number of entry logs continues to increase and eventually it will run out of disk space.
    
    ### Changes
    
    The cause of the compaction stop is that the `compacting` flag remains true if `compactor.compact(entryLogMeta)` throws some exception.
    https://github.com/apache/bookkeeper/blob/b2e099bbc7b13f13825fe78ab009ca132cb3a9ba/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java#L504-L519
    
    Therefore, fixed `GarbageCollectorThread` so that it set the compaction flag to false even if compaction of a specific entry log fails.
    
    Master Issue: #2103
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Sijie Guo <si...@apache.org>
    
    This closes #2121 from massakam/entry-log-compaction, closes #2103
---
 .../bookkeeper/bookie/GarbageCollectorThread.java  | 14 ++--
 .../bookie/GarbageCollectorThreadTest.java         | 81 ++++++++++++++++++++++
 2 files changed, 91 insertions(+), 4 deletions(-)

diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 0065926..811fbe6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -512,10 +512,16 @@ public class GarbageCollectorThread extends SafeRunnable {
             // indicates that compaction is in progress for this EntryLogId.
             return;
         }
-        // Do the actual compaction
-        compactor.compact(entryLogMeta);
-        // Mark compaction done
-        compacting.set(false);
+
+        try {
+            // Do the actual compaction
+            compactor.compact(entryLogMeta);
+        } catch (Exception e) {
+            LOG.error("Failed to compact entry log {} due to unexpected error", entryLogMeta.getEntryLogId(), e);
+        } finally {
+            // Mark compaction done
+            compacting.set(false);
+        }
     }
 
     /**
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
new file mode 100644
index 0000000..1b1f657
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/GarbageCollectorThreadTest.java
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertFalse;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.mockito.MockitoAnnotations.initMocks;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.powermock.reflect.Whitebox;
+
+/**
+ * Unit test for {@link GarbageCollectorThread}.
+ */
+public class GarbageCollectorThreadTest {
+
+    @InjectMocks
+    @Spy
+    private GarbageCollectorThread mockGCThread;
+
+    @Mock
+    private LedgerManager ledgerManager;
+    @Mock
+    private StatsLogger statsLogger;
+    @Mock
+    private ScheduledExecutorService gcExecutor;
+
+    private ServerConfiguration conf = spy(new ServerConfiguration());
+    private CompactableLedgerStorage ledgerStorage = mock(CompactableLedgerStorage.class);
+
+    @Before
+    public void setUp() throws Exception {
+        when(ledgerStorage.getEntryLogger()).thenReturn(mock(EntryLogger.class));
+        initMocks(this);
+    }
+
+    @Test
+    public void testCompactEntryLogWithException() throws Exception {
+        AbstractLogCompactor mockCompactor = mock(AbstractLogCompactor.class);
+        when(mockCompactor.compact(any(EntryLogMetadata.class)))
+                .thenThrow(new RuntimeException("Unexpected compaction error"));
+        Whitebox.setInternalState(mockGCThread, "compactor", mockCompactor);
+
+        // Although compaction of an entry log fails due to an unexpected error,
+        // the `compacting` flag should return to false
+        AtomicBoolean compacting = Whitebox.getInternalState(mockGCThread, "compacting");
+        assertFalse(compacting.get());
+        mockGCThread.compactEntryLog(new EntryLogMetadata(9999));
+        assertFalse(compacting.get());
+    }
+}