You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/09/27 09:03:39 UTC

[bookkeeper] branch master updated: ISSUE #517: port shared-resource-manager utils from distributredlog

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 80b7676  ISSUE #517: port shared-resource-manager utils from distributredlog
80b7676 is described below

commit 80b7676045c34f4678f754f7f78d77c156ba518d
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Sep 27 11:03:33 2017 +0200

    ISSUE #517: port shared-resource-manager utils from distributredlog
    
    Descriptions of the changes in this PR:
    
    Try to portshared-resource-manager utils  from DistributedLog, so it can be used later on.
    
    Author: Jia Zhai <zh...@apache.org>
    
    Reviewers: Enrico Olivelli <eo...@apache.org>
    
    This closes #518 from zhaijack/shared_resource, closes #517
---
 .../bookkeeper/common/util/ExecutorUtils.java      |  47 ++++
 .../common/util/LogExceptionRunnable.java          |  54 +++++
 .../bookkeeper/common/util/ReferenceCounted.java   |  35 +++
 .../common/util/SharedResourceManager.java         | 164 +++++++++++++
 .../bookkeeper/common/util/package-info.java       |  22 ++
 .../common/util/TestSharedResourceManager.java     | 256 +++++++++++++++++++++
 6 files changed, 578 insertions(+)

diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExecutorUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExecutorUtils.java
new file mode 100644
index 0000000..26c2f2b
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExecutorUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Executor/Thread related utils.
+ */
+public class ExecutorUtils {
+
+    /**
+     * Get a {@link ThreadFactory} suitable for use in the current environment.
+     *
+     * @param nameFormat to apply to threads created by the factory.
+     * @param daemon     {@code true} if the threads the factory creates are daemon threads,
+     *                   {@code false} otherwise.
+     * @return a {@link ThreadFactory}.
+     */
+    public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) {
+        ThreadFactory threadFactory = MoreExecutors.platformThreadFactory();
+        return new ThreadFactoryBuilder()
+            .setThreadFactory(threadFactory)
+            .setDaemon(daemon)
+            .setNameFormat(nameFormat)
+            .build();
+    }
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/LogExceptionRunnable.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/LogExceptionRunnable.java
new file mode 100644
index 0000000..bf2ddac
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/LogExceptionRunnable.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A simple wrapper for a {@link Runnable} that logs any exception thrown by it, before
+ * re-throwing it.
+ */
+@Slf4j
+public final class LogExceptionRunnable implements Runnable {
+
+  private final Runnable task;
+
+  public LogExceptionRunnable(Runnable task) {
+    this.task = checkNotNull(task);
+  }
+
+  @Override
+  public void run() {
+    try {
+      task.run();
+    } catch (Throwable t) {
+      log.error("Exception while executing runnable " + task, t);
+      Throwables.throwIfUnchecked(t);
+      throw new AssertionError(t);
+    }
+  }
+
+  @Override
+  public String toString() {
+    return "LogExceptionRunnable(" + task + ")";
+  }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ReferenceCounted.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ReferenceCounted.java
new file mode 100644
index 0000000..0e17a10
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ReferenceCounted.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+/**
+ * An interface for indicating an object is reference counted.
+ */
+public interface ReferenceCounted {
+
+    /**
+     * Retain the reference.
+     */
+    void retain();
+
+    /**
+     * Release the reference.
+     */
+    void release();
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SharedResourceManager.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SharedResourceManager.java
new file mode 100644
index 0000000..b1b67b2
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SharedResourceManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * A holder for shared resource singletons.
+ *
+ * <p>Components like clients and servers need certain resources, e.g. a scheduler,
+ * to run. If the user has not provided such resources, these components will use
+ * a default one, which is shared as a static resource. This class holds these default
+ * resources and manages their lifecycles.
+ *
+ * <p>A resource is identified by the reference of a {@link Resource} object, which
+ * is typically a singleton, provided to the get() and release() methods. Each resource
+ * object (not its class) maps to an object cached in the holder.
+ *
+ * <p>Resources are ref-counted and shut down after a delay when the refcount reaches zero.
+ */
+public class SharedResourceManager {
+
+    static final long DESTROY_DELAY_SECONDS = 1;
+
+    private static final SharedResourceManager SHARED = create();
+
+    public static SharedResourceManager shared() {
+        return SHARED;
+    }
+
+    public static SharedResourceManager create() {
+        return create(() -> Executors.newSingleThreadScheduledExecutor(
+            ExecutorUtils.getThreadFactory("bookkeeper-shared-destroyer-%d", true)));
+    }
+
+    public static SharedResourceManager create(Supplier<ScheduledExecutorService> destroyerFactory) {
+        return new SharedResourceManager(destroyerFactory);
+    }
+
+    /**
+     * Defines a resource, and the way to create and destroy instances of it.
+     *
+     * @param <T> resource type.
+     */
+    public interface Resource<T> {
+        /**
+         * Create a new instance of the resource.
+         *
+         * @return a new instance of the resource.
+         */
+        T create();
+
+        /**
+         * Destroy the given instance.
+         *
+         * @param instance the instance to destroy.
+         */
+        void close(T instance);
+    }
+
+    private static class Instance implements ReferenceCounted {
+
+        private final Object instance;
+        private int refCount;
+        ScheduledFuture<?> destroyTask;
+
+        Instance(Object instance) {
+            this.instance = instance;
+        }
+
+        @Override
+        public void retain() {
+            ++refCount;
+        }
+
+        @Override
+        public void release() {
+            --refCount;
+        }
+
+        void cancelDestroyTask() {
+            if (null != destroyTask) {
+                destroyTask.cancel(false);
+                destroyTask = null;
+            }
+        }
+
+    }
+
+    private final IdentityHashMap<Resource<?>, Instance> instances =
+        new IdentityHashMap<>();
+    private final Supplier<ScheduledExecutorService> destroyerFactory;
+    private ScheduledExecutorService destroyer;
+
+    private SharedResourceManager(Supplier<ScheduledExecutorService> destroyerFactory) {
+        this.destroyerFactory = destroyerFactory;
+    }
+
+    @SuppressWarnings("unchecked")
+    public synchronized <T> T get(Resource<T> resource) {
+        Instance instance = instances.get(resource);
+        if (null == instance) {
+            instance = new Instance(resource.create());
+            instances.put(resource, instance);
+        }
+        instance.cancelDestroyTask();
+        instance.retain();
+        return (T) instance.instance;
+    }
+
+    public synchronized <T> void release(final Resource<T> resource,
+                                         final T instance) {
+        final Instance cached = instances.get(resource);
+        checkArgument(null != cached, "No cached instance found for %s", resource);
+        checkArgument(instance == cached.instance, "Release the wrong instance for %s", resource);
+        checkState(cached.refCount > 0, "Refcount has already reached zero for %s", resource);
+        cached.release();
+        if (0 == cached.refCount) {
+            checkState(null == cached.destroyTask, "Destroy task already scheduled for %s", resource);
+            if (null == destroyer) {
+                destroyer = destroyerFactory.get();
+            }
+            cached.destroyTask = destroyer.schedule(new LogExceptionRunnable(() -> {
+                synchronized (SharedResourceManager.this) {
+                    // Refcount may have gone up since the task was scheduled. Re-check it.
+                    if (cached.refCount == 0) {
+                        resource.close(instance);
+                        instances.remove(resource);
+                        if (instances.isEmpty()) {
+                            destroyer.shutdown();
+                            destroyer = null;
+                        }
+                    }
+                }
+            }), DESTROY_DELAY_SECONDS, TimeUnit.SECONDS);
+        }
+    }
+
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/package-info.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/package-info.java
new file mode 100644
index 0000000..33c6cd9
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Util functions used across the project.
+ */
+package org.apache.bookkeeper.common.util;
\ No newline at end of file
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSharedResourceManager.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSharedResourceManager.java
new file mode 100644
index 0000000..fab622f
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSharedResourceManager.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Unit test for {@link SharedResourceManager}.
+ */
+public class TestSharedResourceManager {
+
+    private final LinkedList<MockScheduledFuture<?>> scheduledDestroyTasks =
+        new LinkedList<>();
+
+    private SharedResourceManager manager;
+
+    private static class ResourceInstance {
+        volatile boolean closed;
+    }
+
+    private static class ResourceFactory implements Resource<ResourceInstance> {
+        @Override
+        public ResourceInstance create() {
+            return new ResourceInstance();
+        }
+
+        @Override
+        public void close(ResourceInstance instance) {
+            instance.closed = true;
+        }
+    }
+
+    // Defines two kinds of resources
+    private static final Resource<ResourceInstance> SHARED_FOO = new ResourceFactory();
+    private static final Resource<ResourceInstance> SHARED_BAR = new ResourceFactory();
+
+    @Before
+    public void setUp() {
+        manager = SharedResourceManager.create(new MockExecutorFactory());
+    }
+
+    @Test
+    public void destroyResourceWhenRefCountReachesZero() {
+        ResourceInstance foo1 = manager.get(SHARED_FOO);
+        ResourceInstance sharedFoo = foo1;
+        ResourceInstance foo2 = manager.get(SHARED_FOO);
+        assertSame(sharedFoo, foo2);
+
+        ResourceInstance bar1 = manager.get(SHARED_BAR);
+        ResourceInstance sharedBar = bar1;
+
+        manager.release(SHARED_FOO, foo2);
+        // foo refcount not reached 0, thus shared foo is not closed
+        assertTrue(scheduledDestroyTasks.isEmpty());
+        assertFalse(sharedFoo.closed);
+
+        manager.release(SHARED_FOO, foo1);
+
+        // foo refcount has reached 0, a destroying task is scheduled
+        assertEquals(1, scheduledDestroyTasks.size());
+        MockScheduledFuture<?> scheduledDestroyTask = scheduledDestroyTasks.poll();
+        assertEquals(SharedResourceManager.DESTROY_DELAY_SECONDS,
+            scheduledDestroyTask.getDelay(TimeUnit.SECONDS));
+
+        // Simluate that the destroyer executes the foo destroying task
+        scheduledDestroyTask.runTask();
+        assertTrue(sharedFoo.closed);
+
+        // After the destroying, obtaining a foo will get a different instance
+        ResourceInstance foo3 = manager.get(SHARED_FOO);
+        assertNotSame(sharedFoo, foo3);
+
+        manager.release(SHARED_BAR, bar1);
+
+        // bar refcount has reached 0, a destroying task is scheduled
+        assertEquals(1, scheduledDestroyTasks.size());
+        scheduledDestroyTask = scheduledDestroyTasks.poll();
+        assertEquals(SharedResourceManager.DESTROY_DELAY_SECONDS,
+            scheduledDestroyTask.getDelay(TimeUnit.SECONDS));
+
+        // Simulate that the destroyer executes the bar destroying task
+        scheduledDestroyTask.runTask();
+        assertTrue(sharedBar.closed);
+    }
+
+    @Test
+    public void cancelDestroyTask() {
+        ResourceInstance foo1 = manager.get(SHARED_FOO);
+        ResourceInstance sharedFoo = foo1;
+        manager.release(SHARED_FOO, foo1);
+        // A destroying task for foo is scheduled
+        MockScheduledFuture<?> scheduledDestroyTask = scheduledDestroyTasks.poll();
+        assertFalse(scheduledDestroyTask.cancelled);
+
+        // obtaining a foo before the destroying task is executed will cancel the destroy
+        ResourceInstance foo2 = manager.get(SHARED_FOO);
+        assertTrue(scheduledDestroyTask.cancelled);
+        assertTrue(scheduledDestroyTasks.isEmpty());
+        assertFalse(sharedFoo.closed);
+
+        // And it will be the same foo instance
+        assertSame(sharedFoo, foo2);
+
+        // Release it and the destroying task is scheduled again
+        manager.release(SHARED_FOO, foo2);
+        scheduledDestroyTask = scheduledDestroyTasks.poll();
+        assertFalse(scheduledDestroyTask.cancelled);
+        scheduledDestroyTask.runTask();
+        assertTrue(sharedFoo.closed);
+    }
+
+    @Test
+    public void releaseWrongInstance() {
+        ResourceInstance uncached = new ResourceInstance();
+        try {
+            manager.release(SHARED_FOO, uncached);
+            fail("Should throw IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+        ResourceInstance cached = manager.get(SHARED_FOO);
+        try {
+            manager.release(SHARED_FOO, uncached);
+            fail("Should throw IllegalArgumentException");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+        manager.release(SHARED_FOO, cached);
+    }
+
+    @Test
+    public void overreleaseInstance() {
+        ResourceInstance foo1 = manager.get(SHARED_FOO);
+        manager.release(SHARED_FOO, foo1);
+        try {
+            manager.release(SHARED_FOO, foo1);
+            fail("Should throw IllegalStateException");
+        } catch (IllegalStateException e) {
+            // expected
+        }
+    }
+
+    private class MockExecutorFactory implements Supplier<ScheduledExecutorService> {
+        @Override
+        public ScheduledExecutorService get() {
+            ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
+            when(mockExecutor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))).thenAnswer(
+                (Answer<MockScheduledFuture<Void>>) invocation -> {
+                    Object[] args = invocation.getArguments();
+                    Runnable command = (Runnable) args[0];
+                    long delay = (Long) args[1];
+                    TimeUnit unit = (TimeUnit) args[2];
+                    MockScheduledFuture<Void> future = new MockScheduledFuture<Void>(
+                        command, delay, unit);
+                    scheduledDestroyTasks.add(future);
+                    return future;
+                });
+            return mockExecutor;
+        }
+    }
+
+    private static class MockScheduledFuture<V> implements ScheduledFuture<V> {
+        private boolean cancelled;
+        private boolean finished;
+        final Runnable command;
+        final long delay;
+        final TimeUnit unit;
+
+        MockScheduledFuture(Runnable command, long delay, TimeUnit unit) {
+            this.command = command;
+            this.delay = delay;
+            this.unit = unit;
+        }
+
+        void runTask() {
+            command.run();
+            finished = true;
+        }
+
+        @Override
+        public boolean cancel(boolean interrupt) {
+            if (cancelled || finished) {
+                return false;
+            }
+            cancelled = true;
+            return true;
+        }
+
+        @Override
+        public boolean isCancelled() {
+            return cancelled;
+        }
+
+        @Override
+        public long getDelay(TimeUnit targetUnit) {
+            return targetUnit.convert(this.delay, this.unit);
+        }
+
+        @Override
+        public int compareTo(Delayed o) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public boolean isDone() {
+            return cancelled || finished;
+        }
+
+        @Override
+        public V get() {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public V get(long timeout, TimeUnit unit) {
+            throw new UnsupportedOperationException();
+        }
+    }
+
+}

-- 
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].