You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/10/06 09:03:22 UTC

svn commit: r1763531 - in /jackrabbit/oak/trunk/oak-segment-tar/src: main/java/org/apache/jackrabbit/oak/segment/file/ test/java/org/apache/jackrabbit/oak/segment/file/

Author: mduerig
Date: Thu Oct  6 09:03:22 2016
New Revision: 1763531

URL: http://svn.apache.org/viewvc?rev=1763531&view=rev
Log:
OAK-4138: Decouple revision cleanup from the flush thread
Introduce a scheduler for executing the background tasks

Added:
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java
    jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java
    jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java?rev=1763531&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java Thu Oct  6 09:03:22 2016
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.Thread.currentThread;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code Runnable} implementation that is safe to submit to an executor
+ * or {@link Scheduler}.
+ * <p>
+ * When this implementation's {@link #run()} method is invoked, it will set
+ * the name of the current thread to the name passed to {@link SafeRunnable},
+ * run the wrapped runnable and finally restore the initial thread name.
+ * When the wrapped runnable throws any unhandled exception, this exception
+ * is logged at error level and the exception is re-thrown.
+ */
+public class SafeRunnable implements Runnable {
+    private static final Logger LOG = LoggerFactory.getLogger(SafeRunnable.class);
+
+    @Nonnull
+    private final String name;
+
+    @Nonnull
+    private final Runnable runnable;
+
+    /**
+     * New instance with the given {@code name} wrapping the passed {@code runnable}.
+     * @param name
+     * @param runnable
+     */
+    public SafeRunnable(@Nonnull String name, @Nonnull Runnable runnable) {
+        this.name = checkNotNull(name);
+        this.runnable = checkNotNull(runnable);
+    }
+
+    /**
+     * See class comment
+     */
+    @Override
+    public void run() {
+        String n = currentThread().getName();
+        currentThread().setName(name);
+        try {
+            runnable.run();
+        } catch (Throwable e) {
+            LOG.error("Uncaught exception in {}", name, e);
+            throw e;
+        } finally {
+            currentThread().setName(n);
+        }
+    }
+}

Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java?rev=1763531&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java Thu Oct  6 09:03:22 2016
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.Executors.defaultThreadFactory;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.Closeable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple scheduler for executing and scheduling tasks in the background.
+ * This implementation delegates all background execution to an instance
+ * of a {@link ScheduledExecutorService} with core pool size 1. The behaviour
+ * of this underlying scheduler service determines the semantics of the methods
+ * in this class. Namely: Execution of background tasks never overlaps and is
+ * FIFO for tasks scheduled for the same time.
+ * In addition all tasks scheduled through methods of this class are automatically
+ * wrapped into {@link SafeRunnable} instances. The background thread executing
+ * submitted tasks is a deamon thread.
+ */
+public class Scheduler implements Closeable {
+    private static int schedulerNumber = 0;
+    private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+
+    @Nonnull
+    private final AtomicLong executionCounter = new AtomicLong();
+
+    @Nonnull
+    private final String name;
+
+    @Nonnull
+    private final ScheduledExecutorService executor;
+
+    /**
+     * Create a new instance with the given {@code name}. The name is used to
+     * derive the default name of the background thread from..
+     * @param name
+     */
+    public Scheduler(@Nullable String name) {
+        if (name == null) {
+            synchronized (Scheduler.class) {
+                this.name = "scheduler-" + schedulerNumber;
+                schedulerNumber++;
+            }
+        } else {
+            this.name = name;
+        }
+        this.executor = newScheduledThreadPool(1, new SchedulerThreadFactory(this.name));
+    }
+
+    /**
+     * Immediately execute {@code task}. The background thread's name is
+     * set to {@code name} during execution of {@code task}.
+     * @param name
+     * @param task
+     * @see ScheduledExecutorService#execute(Runnable)
+     */
+    public void execute(@Nonnull String name, @Nonnull Runnable task) {
+        executor.execute(new SafeRunnable(name, task));
+    }
+
+    /**
+     * Run {@code task} once after some delay. The background thread's name is
+     * set to {@code name} during execution of {@code task}.
+     * @param name
+     * @param delay
+     * @param unit
+     * @param task
+     * @see ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
+     */
+    public void scheduleOnce(
+            @Nonnull String name,
+            long delay,
+            @Nonnull TimeUnit unit,
+            @Nonnull Runnable task) {
+        executor.schedule(new SafeRunnable(name, task), delay, unit);
+    }
+
+    /**
+     * Run {@code task} regularly at a given interval. The background thread's name is
+     * set to {@code name} during execution of {@code task}.
+     * @param name
+     * @param period
+     * @param unit
+     * @param task
+     * @see ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)
+     */
+    public void scheduleAtFixedRate(
+            @Nonnull String name,
+            long period,
+            @Nonnull TimeUnit unit,
+            @Nonnull Runnable task) {
+        executor.scheduleAtFixedRate(new SafeRunnable(name, task), period, period, unit);
+    }
+
+    /**
+     * Close this scheduler and wait 5 second for currently executing tasks to finish.
+     * Logs a warning if not all tasks finished executing after 5 seconds.
+     * @see ScheduledExecutorService#shutdown()
+     */
+    @Override
+    public void close() {
+        try {
+            executor.shutdown();
+            if (executor.awaitTermination(5, SECONDS)) {
+                LOG.debug("The scheduler {} was successfully shut down", name);
+            } else {
+                LOG.warn("The scheduler {} takes too long to shutdown", name);
+            }
+        } catch (InterruptedException e) {
+            LOG.warn("Interrupt while shutting down he scheduler {}", name, e);
+            currentThread().interrupt();
+        }
+    }
+
+    private static class SchedulerThreadFactory implements ThreadFactory {
+        private final ThreadFactory threadFactory = defaultThreadFactory();
+
+        @Nonnull
+        private final String name;
+
+        public SchedulerThreadFactory(@Nonnull String name) {
+            this.name = name;
+        }
+
+        @Override
+        public Thread newThread(Runnable runnable) {
+            Thread thread = threadFactory.newThread(runnable);
+            thread.setName(name);
+            thread.setDaemon(true);
+            return thread;
+        }
+    }
+
+}

Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java?rev=1763531&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java Thu Oct  6 09:03:22 2016
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Test;
+
+public class SchedulerTest {
+    private final Scheduler scheduler = new Scheduler("test-scheduler");
+
+    @After
+    public void tearDown() {
+        scheduler.close();
+    }
+
+    @Test
+    public void execute() throws Exception {
+        TestTask task = new TestTask(1);
+        scheduler.execute("execute", task);
+
+        assertTrue(task.await());
+        assertEquals("execute", task.getThreadName());
+    }
+
+    @Test
+    public void scheduleOnce() throws Exception {
+        TestTask task = new TestTask(1);
+        scheduler.scheduleOnce("scheduleOnce", 1, SECONDS, task);
+
+        assertNull(task.getThreadName());
+        assertTrue(task.await());
+        assertEquals("scheduleOnce", task.getThreadName());
+    }
+
+    @Test
+    public void scheduleAtFixedRate() throws Exception {
+        TestTask task = new TestTask(5);
+        scheduler.scheduleAtFixedRate("scheduleAtFixedRate", 200, MILLISECONDS, task);
+
+        assertNull(task.getThreadName());
+        assertTrue(task.await());
+        assertEquals("scheduleAtFixedRate", task.getThreadName());
+    }
+
+    private static class TestTask implements Runnable {
+        private final AtomicReference<String> threadName = new AtomicReference<>();
+        private final CountDownLatch done;
+
+        public TestTask(int count) {
+            done = new CountDownLatch(count);
+        }
+
+        @Override
+        public void run() {
+            if (done.getCount() == 1) {
+                threadName.set(currentThread().getName());
+            }
+            done.countDown();
+        }
+
+        public boolean await() throws InterruptedException {
+            return done.await(5, SECONDS);
+        }
+
+        public String getThreadName() {
+            return threadName.get();
+        }
+    }
+
+}
\ No newline at end of file