You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2018/08/15 07:40:26 UTC

[GitHub] sijie closed pull request #851: Issue 578 : make MajorCompaction controlled by time of the day/day of the week

sijie closed pull request #851: Issue 578 : make MajorCompaction controlled by time of the day/day of the week
URL: https://github.com/apache/bookkeeper/pull/851
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bookkeeper-server/conf/bk_server.conf b/bookkeeper-server/conf/bk_server.conf
index 7c82de287c..5f5d5ef6b3 100755
--- a/bookkeeper-server/conf/bk_server.conf
+++ b/bookkeeper-server/conf/bk_server.conf
@@ -469,9 +469,13 @@ ledgerManagerFactoryClass=org.apache.bookkeeper.meta.HierarchicalLedgerManagerFa
 # this threshold will be compacted in a major compaction.
 # Those entry log files whose remaining size percentage is still
 # higher than the threshold will never be compacted.
-# If it is set to less than zero, the minor compaction is disabled.
+# If it is set to less than zero, the major compaction is disabled.
 # majorCompactionThreshold=0.8
 
+#when assign a valide cron expression to scheduleMajorCompactionCron, the majorCompaction will be executed when the major compation time belongs to the time
+#which is expressed by scheduleMajorCompactionCron.
+#scheduleMajorCompactionCron=
+
 # Interval to run major compaction, in seconds
 # If it is set to less than zero, the major compaction is disabled.
 # majorCompactionInterval=86400
diff --git a/bookkeeper-server/pom.xml b/bookkeeper-server/pom.xml
index f0721b6aba..464a85cc6d 100644
--- a/bookkeeper-server/pom.xml
+++ b/bookkeeper-server/pom.xml
@@ -209,6 +209,11 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.quartz-scheduler</groupId>
+      <artifactId>quartz</artifactId>
+      <version>2.2.1</version>
+    </dependency>
   </dependencies>
   <build>
     <plugins>
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CronBasedCompactionJob.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CronBasedCompactionJob.java
new file mode 100644
index 0000000000..7758e21fa1
--- /dev/null
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/CronBasedCompactionJob.java
@@ -0,0 +1,53 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import org.quartz.Job;
+import org.quartz.JobDataMap;
+import org.quartz.JobExecutionContext;
+import org.quartz.JobExecutionException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A quartz job used for cron based compaction.
+ */
+public class CronBasedCompactionJob implements Job {
+    private static final Logger LOG = LoggerFactory.getLogger(CronBasedCompactionJob.class);
+    public GarbageCollectorThread garbageCollectorThread;
+    @Override
+    public void execute(JobExecutionContext context)
+            throws JobExecutionException {
+        JobDataMap dataMap = context.getJobDetail().getJobDataMap();
+        garbageCollectorThread = (GarbageCollectorThread) dataMap.get("gcThread");
+        garbageCollectorThread.safeRun();
+        if (LOG.isDebugEnabled()) {
+            String jobName = context.getJobDetail().getKey().getName();
+            SimpleDateFormat dateFormat = new SimpleDateFormat("dd/MM/yyyy HH hour mm minute ss second");
+            String jobRunTime = dateFormat.format(Calendar.getInstance().getTime());
+            LOG.debug("Job {} executed on {}", jobName, jobRunTime);
+        }
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
index 4880fa3547..cf3c252d76 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/GarbageCollectorThread.java
@@ -28,11 +28,15 @@
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_COMPACTION_SPACE_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.RECLAIMED_DELETION_SPACE_BYTES;
 import static org.apache.bookkeeper.bookie.BookKeeperServerStats.THREAD_RUNTIME;
+import static org.quartz.CronScheduleBuilder.cronSchedule;
+import static org.quartz.JobBuilder.newJob;
+import static org.quartz.TriggerBuilder.newTrigger;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.util.concurrent.DefaultThreadFactory;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
@@ -54,6 +58,11 @@
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.bookkeeper.util.SafeRunnable;
+import org.quartz.CronTrigger;
+import org.quartz.JobDetail;
+import org.quartz.Scheduler;
+import org.quartz.SchedulerException;
+import org.quartz.impl.StdSchedulerFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,14 +70,16 @@
  * This is the garbage collector thread that runs in the background to
  * remove any entry log files that no longer contains any active ledger.
  */
-public class GarbageCollectorThread extends SafeRunnable {
+public class GarbageCollectorThread extends SafeRunnable implements Serializable{
     private static final Logger LOG = LoggerFactory.getLogger(GarbageCollectorThread.class);
     private static final int SECOND = 1000;
+    private static final long serialVersionUID = 8831216697587016042L;
 
     // Maps entry log files to the set of ledgers that comprise the file and the size usage per ledger
     private Map<Long, EntryLogMetadata> entryLogMetaMap = new ConcurrentHashMap<Long, EntryLogMetadata>();
 
     private final ScheduledExecutorService gcExecutor;
+    private Scheduler sched;
     Future<?> scheduledFuture = null;
 
     // This is how often we want to run the Garbage Collector Thread (in milliseconds).
@@ -119,6 +130,9 @@
     final AtomicBoolean suspendMajorCompaction = new AtomicBoolean(false);
     // Boolean to disable minor compaction, when disk is full
     final AtomicBoolean suspendMinorCompaction = new AtomicBoolean(false);
+    // Boolean to indicate whether CronBasedCompactionSchedule is on
+    final AtomicBoolean enableCronBasedCompactionSchedule = new AtomicBoolean(true);
+    final String scheduleMajorCompactionCron;
 
     final GarbageCollector garbageCollector;
     final GarbageCleaner garbageCleaner;
@@ -139,6 +153,15 @@ public GarbageCollectorThread(ServerConfiguration conf,
                                   StatsLogger statsLogger)
         throws IOException {
         gcExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("GarbageCollectorThread"));
+        try {
+            sched = new StdSchedulerFactory().getScheduler();
+        } catch (SchedulerException se) {
+            if (se.getUnderlyingException() instanceof IOException) {
+                throw (IOException) se.getUnderlyingException();
+            } else {
+                enableCronBasedCompactionSchedule.set(false);
+            }
+        }
         this.conf = conf;
 
         this.entryLogger = ledgerStorage.getEntryLogger();
@@ -243,6 +266,10 @@ public Long getSample() {
                + majorCompactionThreshold + ", interval=" + majorCompactionInterval);
 
         lastMinorCompactionTime = lastMajorCompactionTime = MathUtils.now();
+        scheduleMajorCompactionCron = conf.getScheduleMajorCompactionCron();
+        if (scheduleMajorCompactionCron == null) {
+            enableCronBasedCompactionSchedule.set(false);
+        }
     }
 
     public void enableForceGC() {
@@ -309,6 +336,23 @@ public void start() {
             scheduledFuture.cancel(false);
         }
         scheduledFuture = gcExecutor.scheduleAtFixedRate(this, gcWaitTime, gcWaitTime, TimeUnit.MILLISECONDS);
+        if (enableCronBasedCompactionSchedule.get()) {
+            try {
+                doCronBasedSchedule();
+            } catch (SchedulerException se) {
+                LOG.error("Schedule CronBasedCompaction fail in GarbageCollectorThread", se);
+            }
+        }
+    }
+
+    private void doCronBasedSchedule() throws SchedulerException{
+        JobDetail job = newJob(CronBasedCompactionJob.class).withIdentity("gc", "group_job")
+                .build();
+        CronTrigger trigger = newTrigger().withIdentity("gc_trigger", "group_trigger")
+                .withSchedule(cronSchedule(scheduleMajorCompactionCron)).build();
+        job.getJobDataMap().put("gcThread", this);
+        sched.scheduleJob(job, trigger);
+        sched.start();
     }
 
     @Override
@@ -477,6 +521,7 @@ public void shutdown() throws InterruptedException {
 
         // Interrupt GC executor thread
         gcExecutor.shutdownNow();
+//        sched.shutdown(true);
     }
 
     /**
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 3a7c763de5..4dd6bb6aaa 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -42,6 +42,7 @@
     protected static final String MINOR_COMPACTION_THRESHOLD = "minorCompactionThreshold";
     protected static final String MAJOR_COMPACTION_INTERVAL = "majorCompactionInterval";
     protected static final String MAJOR_COMPACTION_THRESHOLD = "majorCompactionThreshold";
+    private static final String SCHEDULE_MAJOR_COMPACTION_CRON = "scheduleMajorCompactionCron";
     protected static final String IS_THROTTLE_BY_BYTES = "isThrottleByBytes";
     protected static final String COMPACTION_MAX_OUTSTANDING_REQUESTS = "compactionMaxOutstandingRequests";
     protected static final String COMPACTION_RATE = "compactionRate";
@@ -1117,6 +1118,29 @@ public ServerConfiguration setMajorCompactionThreshold(double threshold) {
         return this;
     }
 
+    /**
+     * Set major compaction cron expression for schedule.
+     * set this value to a time intervals in which system load is low,
+     * where deeper compaction is better.
+     *
+     * @param scheduleMajorCompactionCron
+     *          specified cron expression for schedule.
+     * @return server configuration
+     */
+    public ServerConfiguration setScheduleMajorCompactionCron(String scheduleMajorCompactionCron) {
+        setProperty(SCHEDULE_MAJOR_COMPACTION_CRON, scheduleMajorCompactionCron);
+        return this;
+    }
+
+    /**
+     * Get major compaction cron expression for schedule.
+     * default is null to disable.
+     * "0 * 0-3 ? * SAT,SUN" indicates 0-3 h in Saturday and Sunday.
+     */
+    public String getScheduleMajorCompactionCron() {
+        return getString(SCHEDULE_MAJOR_COMPACTION_CRON, null);
+    }
+
     /**
      * Get interval to run minor compaction, in seconds.
      *
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
index 6873a5c53f..f1b63580fa 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/CompactionTest.java
@@ -533,6 +533,42 @@ public void testMajorCompaction() throws Exception {
         verifyLedger(lhs[1].getId(), 0, lhs[1].getLastAddConfirmed());
     }
 
+    @Test
+    public void testCronBasedCompactionSchedule() throws Exception {
+        // prepare data
+        LedgerHandle[] lhs = prepareData(3, true);
+
+        for (LedgerHandle lh : lhs) {
+            lh.close();
+        }
+
+
+        long lastMinorCompactionTime = getGCThread().lastMinorCompactionTime;
+        long lastMajorCompactionTime = getGCThread().lastMajorCompactionTime;
+        assertTrue(getGCThread().enableMajorCompaction);
+        assertTrue(getGCThread().enableMinorCompaction);
+
+        // remove ledger1 and ledger2
+        bkc.deleteLedger(lhs[0].getId());
+        bkc.deleteLedger(lhs[1].getId());
+        LOG.info("Finished deleting the ledgers contains less entries.");
+        // compaction should not be executed by default
+        assertTrue(getGCThread().lastMinorCompactionTime == lastMinorCompactionTime);
+        assertTrue(getGCThread().lastMajorCompactionTime == lastMajorCompactionTime);
+        // execute every second
+        String  cron = "0/1 * * * * ?";
+        baseConf.setScheduleMajorCompactionCron(cron);
+        restartBookies(baseConf);
+        // compaction should be executed
+        assertTrue(getGCThread().lastMinorCompactionTime > lastMinorCompactionTime);
+        assertTrue(getGCThread().lastMajorCompactionTime > lastMajorCompactionTime);
+        // entry logs ([0,1,2].log) should not be compacted, all remaining >= 0.7
+        for (File ledgerDirectory : tmpDirs) {
+            assertTrue("Not Found entry log file ([0,1,2].log that should have not been compacted in ledgerDirectory: "
+                    + ledgerDirectory, TestUtils.hasLogFiles(ledgerDirectory, false, 0, 1, 2));
+        }
+    }
+
     @Test
     public void testMajorCompactionAboveThreshold() throws Exception {
         // prepare data
diff --git a/site/_data/config/bk_server.yaml b/site/_data/config/bk_server.yaml
index 54a26c4be6..9a89863dbd 100644
--- a/site/_data/config/bk_server.yaml
+++ b/site/_data/config/bk_server.yaml
@@ -312,6 +312,9 @@ groups:
   - param: majorCompactionThreshold
     description: Threshold of major compaction. For those entry log files whose remaining size percentage reaches below  this threshold will be compacted in a major compaction.  Those entry log files whose remaining size percentage is still higher than the threshold will never be compacted. If it is set to less than zero, the minor compaction is disabled.
     default: 0.8
+  - param: scheduleMajorCompactionCron
+    description: when assign a valide cron expression to scheduleMajorCompactionCron, the majorCompaction will be executed when the major compation time belongs to the time which is expressed by scheduleMajorCompactionCron.
+    default:
   - param: majorCompactionInterval
     description: Interval to run major compaction, in seconds. If it is set to less than zero, the major compaction is disabled.
     default: 86400


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services