You are viewing a plain text version of this content. The canonical link for it is here.
Posted to oak-commits@jackrabbit.apache.org by md...@apache.org on 2016/10/06 09:03:22 UTC
svn commit: r1763531 - in /jackrabbit/oak/trunk/oak-segment-tar/src:
main/java/org/apache/jackrabbit/oak/segment/file/
test/java/org/apache/jackrabbit/oak/segment/file/
Author: mduerig
Date: Thu Oct 6 09:03:22 2016
New Revision: 1763531
URL: http://svn.apache.org/viewvc?rev=1763531&view=rev
Log:
OAK-4138: Decouple revision cleanup from the flush thread
Introduce a scheduler for executing the background tasks
Added:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java
jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java?rev=1763531&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/SafeRunnable.java Thu Oct 6 09:03:22 2016
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.Thread.currentThread;
+
+import javax.annotation.Nonnull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code Runnable} implementation that is safe to submit to an executor
+ * or {@link Scheduler}.
+ * <p>
+ * When this implementation's {@link #run()} method is invoked, it will set
+ * the name of the current thread to the name passed to {@link SafeRunnable},
+ * run the wrapped runnable and finally restore the initial thread name.
+ * When the wrapped runnable throws any unhandled exception, this exception
+ * is logged at error level and the exception is re-thrown.
+ */
+public class SafeRunnable implements Runnable {
+ private static final Logger LOG = LoggerFactory.getLogger(SafeRunnable.class);
+
+ @Nonnull
+ private final String name;
+
+ @Nonnull
+ private final Runnable runnable;
+
+ /**
+ * New instance with the given {@code name} wrapping the passed {@code runnable}.
+ * @param name
+ * @param runnable
+ */
+ public SafeRunnable(@Nonnull String name, @Nonnull Runnable runnable) {
+ this.name = checkNotNull(name);
+ this.runnable = checkNotNull(runnable);
+ }
+
+ /**
+ * See class comment
+ */
+ @Override
+ public void run() {
+ String n = currentThread().getName();
+ currentThread().setName(name);
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ LOG.error("Uncaught exception in {}", name, e);
+ throw e;
+ } finally {
+ currentThread().setName(n);
+ }
+ }
+}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java?rev=1763531&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/Scheduler.java Thu Oct 6 09:03:22 2016
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.Executors.defaultThreadFactory;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+import java.io.Closeable;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A simple scheduler for executing and scheduling tasks in the background.
+ * This implementation delegates all background execution to an instance
+ * of a {@link ScheduledExecutorService} with core pool size 1. The behaviour
+ * of this underlying scheduler service determines the semantics of the methods
+ * in this class. Namely: Execution of background tasks never overlaps and is
+ * FIFO for tasks scheduled for the same time.
+ * In addition all tasks scheduled through methods of this class are automatically
+ * wrapped into {@link SafeRunnable} instances. The background thread executing
+ * submitted tasks is a deamon thread.
+ */
+public class Scheduler implements Closeable {
+ private static int schedulerNumber = 0;
+ private static final Logger LOG = LoggerFactory.getLogger(Scheduler.class);
+
+ @Nonnull
+ private final AtomicLong executionCounter = new AtomicLong();
+
+ @Nonnull
+ private final String name;
+
+ @Nonnull
+ private final ScheduledExecutorService executor;
+
+ /**
+ * Create a new instance with the given {@code name}. The name is used to
+ * derive the default name of the background thread from..
+ * @param name
+ */
+ public Scheduler(@Nullable String name) {
+ if (name == null) {
+ synchronized (Scheduler.class) {
+ this.name = "scheduler-" + schedulerNumber;
+ schedulerNumber++;
+ }
+ } else {
+ this.name = name;
+ }
+ this.executor = newScheduledThreadPool(1, new SchedulerThreadFactory(this.name));
+ }
+
+ /**
+ * Immediately execute {@code task}. The background thread's name is
+ * set to {@code name} during execution of {@code task}.
+ * @param name
+ * @param task
+ * @see ScheduledExecutorService#execute(Runnable)
+ */
+ public void execute(@Nonnull String name, @Nonnull Runnable task) {
+ executor.execute(new SafeRunnable(name, task));
+ }
+
+ /**
+ * Run {@code task} once after some delay. The background thread's name is
+ * set to {@code name} during execution of {@code task}.
+ * @param name
+ * @param delay
+ * @param unit
+ * @param task
+ * @see ScheduledExecutorService#schedule(Runnable, long, TimeUnit)
+ */
+ public void scheduleOnce(
+ @Nonnull String name,
+ long delay,
+ @Nonnull TimeUnit unit,
+ @Nonnull Runnable task) {
+ executor.schedule(new SafeRunnable(name, task), delay, unit);
+ }
+
+ /**
+ * Run {@code task} regularly at a given interval. The background thread's name is
+ * set to {@code name} during execution of {@code task}.
+ * @param name
+ * @param period
+ * @param unit
+ * @param task
+ * @see ScheduledExecutorService#scheduleAtFixedRate(Runnable, long, long, TimeUnit)
+ */
+ public void scheduleAtFixedRate(
+ @Nonnull String name,
+ long period,
+ @Nonnull TimeUnit unit,
+ @Nonnull Runnable task) {
+ executor.scheduleAtFixedRate(new SafeRunnable(name, task), period, period, unit);
+ }
+
+ /**
+ * Close this scheduler and wait 5 second for currently executing tasks to finish.
+ * Logs a warning if not all tasks finished executing after 5 seconds.
+ * @see ScheduledExecutorService#shutdown()
+ */
+ @Override
+ public void close() {
+ try {
+ executor.shutdown();
+ if (executor.awaitTermination(5, SECONDS)) {
+ LOG.debug("The scheduler {} was successfully shut down", name);
+ } else {
+ LOG.warn("The scheduler {} takes too long to shutdown", name);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupt while shutting down he scheduler {}", name, e);
+ currentThread().interrupt();
+ }
+ }
+
+ private static class SchedulerThreadFactory implements ThreadFactory {
+ private final ThreadFactory threadFactory = defaultThreadFactory();
+
+ @Nonnull
+ private final String name;
+
+ public SchedulerThreadFactory(@Nonnull String name) {
+ this.name = name;
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread thread = threadFactory.newThread(runnable);
+ thread.setName(name);
+ thread.setDaemon(true);
+ return thread;
+ }
+ }
+
+}
Added: jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java
URL: http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java?rev=1763531&view=auto
==============================================================================
--- jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java (added)
+++ jackrabbit/oak/trunk/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/file/SchedulerTest.java Thu Oct 6 09:03:22 2016
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.jackrabbit.oak.segment.file;
+
+import static java.lang.Thread.currentThread;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Test;
+
+public class SchedulerTest {
+ private final Scheduler scheduler = new Scheduler("test-scheduler");
+
+ @After
+ public void tearDown() {
+ scheduler.close();
+ }
+
+ @Test
+ public void execute() throws Exception {
+ TestTask task = new TestTask(1);
+ scheduler.execute("execute", task);
+
+ assertTrue(task.await());
+ assertEquals("execute", task.getThreadName());
+ }
+
+ @Test
+ public void scheduleOnce() throws Exception {
+ TestTask task = new TestTask(1);
+ scheduler.scheduleOnce("scheduleOnce", 1, SECONDS, task);
+
+ assertNull(task.getThreadName());
+ assertTrue(task.await());
+ assertEquals("scheduleOnce", task.getThreadName());
+ }
+
+ @Test
+ public void scheduleAtFixedRate() throws Exception {
+ TestTask task = new TestTask(5);
+ scheduler.scheduleAtFixedRate("scheduleAtFixedRate", 200, MILLISECONDS, task);
+
+ assertNull(task.getThreadName());
+ assertTrue(task.await());
+ assertEquals("scheduleAtFixedRate", task.getThreadName());
+ }
+
+ private static class TestTask implements Runnable {
+ private final AtomicReference<String> threadName = new AtomicReference<>();
+ private final CountDownLatch done;
+
+ public TestTask(int count) {
+ done = new CountDownLatch(count);
+ }
+
+ @Override
+ public void run() {
+ if (done.getCount() == 1) {
+ threadName.set(currentThread().getName());
+ }
+ done.countDown();
+ }
+
+ public boolean await() throws InterruptedException {
+ return done.await(5, SECONDS);
+ }
+
+ public String getThreadName() {
+ return threadName.get();
+ }
+ }
+
+}
\ No newline at end of file