You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/11/06 23:11:34 UTC

[GitHub] yzang commented on a change in pull request #704: BOOKKEEPER-1040: Use separate log for compaction and add transaction support

yzang commented on a change in pull request #704: BOOKKEEPER-1040: Use separate log for compaction and add transaction support
URL: https://github.com/apache/bookkeeper/pull/704#discussion_r149231895
 
 

 ##########
 File path: bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/TransactionalEntryLogCompactor.java
 ##########
 @@ -0,0 +1,413 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+import org.apache.bookkeeper.bookie.EntryLogger.EntryLogScanner;
+import org.apache.bookkeeper.util.HardLink;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used for compaction. Compaction is done in several transactional phases.
+ * Phase 1: Scan old entry log and compact entries to a new .compacting log file.
+ * Phase 2: Flush .compacting log to disk and it becomes .compacted log file when this completes.
+ * Phase 3: Flush ledger cache and .compacted file becomes .log file when this completes. Remove old
+ * entry log file afterwards.
+ */
+public class TransactionalEntryLogCompactor extends AbstractLogCompactor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TransactionalEntryLogCompactor.class);
+
+    final EntryLogger entryLogger;
+    final CompactableLedgerStorage ledgerStorage;
+    final Throttler throttler;
+    final List<EntryLocation> offsets = new ArrayList<>();
+
+    // compaction log file suffix
+    static final String COMPACTING_SUFFIX = ".log.compacting";
+    // flushed compaction log file suffix
+    static final String COMPACTED_SUFFIX = ".compacted";
+
+    private class Throttler {
+        final RateLimiter rateLimiter;
+        final boolean isThrottleByBytes;
+        final int compactionRateByBytes;
+        final int compactionRateByEntries;
+
+        Throttler(boolean isThrottleByBytes,
+                  int compactionRateByBytes,
+                  int compactionRateByEntries) {
+            this.isThrottleByBytes = isThrottleByBytes;
+            this.compactionRateByBytes = compactionRateByBytes;
+            this.compactionRateByEntries = compactionRateByEntries;
+            this.rateLimiter = RateLimiter.create(this.isThrottleByBytes ?
+                this.compactionRateByBytes :
+                this.compactionRateByEntries);
+        }
+
+        // acquire. if bybytes: bytes of this entry; if byentries: 1.
+        void acquire(int permits) {
+            rateLimiter.acquire(this.isThrottleByBytes ? permits : 1);
+        }
+    }
+
+    public TransactionalEntryLogCompactor(GarbageCollectorThread gcThread) {
+        super(gcThread);
+        this.entryLogger = gcThread.getEntryLogger();
+        this.ledgerStorage = gcThread.getLedgerStorage();
+        boolean isThrottleByBytes = conf.getIsThrottleByBytes();
+        int compactionRateByEntries = conf.getCompactionRateByEntries();
+        int compactionRateByBytes = conf.getCompactionRateByBytes();
+        // Compaction Throttler
+        this.throttler = new Throttler(isThrottleByBytes,
+            compactionRateByBytes,
+            compactionRateByEntries);
+    }
+
+    /**
+     * Delete all previously incomplete compacting logs and recover the index for compacted logs.
+     */
+    public void cleanUpAndRecover() {
+        // clean up compacting logs and recover index for already compacted logs
+        List<File> ledgerDirs = entryLogger.getLedgerDirsManager().getAllLedgerDirs();
+        for (File dir : ledgerDirs) {
+            File[] compactingPhaseFiles = dir.listFiles(file -> file.getName().endsWith(COMPACTING_SUFFIX));
+            if (compactingPhaseFiles != null) {
+                for (File file : compactingPhaseFiles) {
+                    LOG.info("Deleting failed compaction file {}", file);
+                    file.delete();
+                }
+            }
+            File[] compactedPhaseFiles = dir.listFiles(file -> file.getName().endsWith(COMPACTED_SUFFIX));
+            if (compactedPhaseFiles != null) {
+                for (File compactedFile : compactedPhaseFiles) {
+                    LOG.info("Found compacted log file {} has partially flushed index, recovering index.", compactedFile);
+                    CompactionPhase updateIndex = new UpdateIndexPhase(compactedFile, true);
+                    updateIndex.run();
+                }
+            }
+        }
+    }
+
+    @Override
+    public boolean compact(EntryLogMetadata metadata) {
+        if (metadata != null) {
+            LOG.info("Compacting entry log {} with usage {}.",
+                new Object[]{metadata.getEntryLogId(), metadata.getUsage()});
+            CompactionPhase scanEntryLog = new ScanEntryLogPhase(metadata);
+            if (!scanEntryLog.run()) {
+                LOG.info("Compaction for entry log {} end in ScanEntryLogPhase.", metadata.getEntryLogId());
+                return false;
+            }
+            File compactionLogFile = entryLogger.getCurCompactionLogFile();
+            CompactionPhase flushCompactionLog = new FlushCompactionLogPhase(metadata.getEntryLogId());
+            if (!flushCompactionLog.run()) {
+                LOG.info("Compaction for entry log {} end in FlushCompactionLogPhase.", metadata.getEntryLogId());
+                return false;
+            }
+            File compactedLogFile = getCompactedLogFile(compactionLogFile, metadata.getEntryLogId());
+            CompactionPhase updateIndex = new UpdateIndexPhase(compactedLogFile);
+            if (!updateIndex.run()) {
+                LOG.info("Compaction for entry log {} end in UpdateIndexPhase.", metadata.getEntryLogId());
+                return false;
+            }
+            LOG.info("Compacted entry log : {}.", metadata.getEntryLogId());
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * An abstract class that would be extended to be the actual transactional phases for compaction
+     */
+    abstract static class CompactionPhase {
+        private String phaseName = "";
+
+        CompactionPhase(String phaseName) {
+            this.phaseName = phaseName;
+        }
+
+        boolean run() {
+            try {
+                start();
+                return complete();
+            } catch (IOException e) {
+                LOG.error("Encounter exception in compaction phase {}. Abort current compaction.", phaseName, e);
+                abort();
+            }
+            return false;
+        }
+
+        abstract void start() throws IOException;
+
+        abstract boolean complete() throws IOException;
+
+        abstract void abort();
+
+    }
+
+    /**
+     * Assume we're compacting entry log 1 to entry log 3.
+     * The first phase is to scan entries in 1.log and copy them to compaction log file "3.log.compacting".
+     * We'll try to allocate a new compaction log before scanning to make sure we have a log file to write.
+     * If after scanning, there's no data written, it means there's no valid entries to be compacted,
+     * so we can remove 1.log directly, clear the offsets and end the compaction.
+     * Otherwise, we should move on to the next phase.
+     * <p>
+     * If anything failed in this phase, we should delete the compaction log and clean the offsets.
+     */
+    class ScanEntryLogPhase extends CompactionPhase {
+        private EntryLogMetadata metadata;
+
+        ScanEntryLogPhase(EntryLogMetadata metadata) {
+            super("ScanEntryLogPhase");
+            this.metadata = metadata;
+        }
+
+        @Override
+        void start() throws IOException {
+            // scan entry log into compaction log and offset list
+            entryLogger.createNewCompactionLog();
+            entryLogger.scanEntryLog(metadata.getEntryLogId(), new EntryLogScanner() {
+                @Override
+                public boolean accept(long ledgerId) {
+                    return metadata.containsLedger(ledgerId);
+                }
+
+                @Override
+                public void process(long ledgerId, long offset, ByteBuffer entry) throws IOException {
+                    throttler.acquire(entry.remaining());
+                    synchronized (this) {
 
 Review comment:
   I'm thinking maybe we don't need synchronization here? Since we only have one thread doing GC and compaction anyway..

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services