You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by eo...@apache.org on 2017/09/27 09:03:39 UTC
[bookkeeper] branch master updated: ISSUE #517: port
shared-resource-manager utils from distributredlog
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 80b7676 ISSUE #517: port shared-resource-manager utils from distributredlog
80b7676 is described below
commit 80b7676045c34f4678f754f7f78d77c156ba518d
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Wed Sep 27 11:03:33 2017 +0200
ISSUE #517: port shared-resource-manager utils from distributredlog
Descriptions of the changes in this PR:
Try to portshared-resource-manager utils from DistributedLog, so it can be used later on.
Author: Jia Zhai <zh...@apache.org>
Reviewers: Enrico Olivelli <eo...@apache.org>
This closes #518 from zhaijack/shared_resource, closes #517
---
.../bookkeeper/common/util/ExecutorUtils.java | 47 ++++
.../common/util/LogExceptionRunnable.java | 54 +++++
.../bookkeeper/common/util/ReferenceCounted.java | 35 +++
.../common/util/SharedResourceManager.java | 164 +++++++++++++
.../bookkeeper/common/util/package-info.java | 22 ++
.../common/util/TestSharedResourceManager.java | 256 +++++++++++++++++++++
6 files changed, 578 insertions(+)
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExecutorUtils.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExecutorUtils.java
new file mode 100644
index 0000000..26c2f2b
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ExecutorUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * Executor/Thread related utils.
+ */
+public class ExecutorUtils {
+
+ /**
+ * Get a {@link ThreadFactory} suitable for use in the current environment.
+ *
+ * @param nameFormat to apply to threads created by the factory.
+ * @param daemon {@code true} if the threads the factory creates are daemon threads,
+ * {@code false} otherwise.
+ * @return a {@link ThreadFactory}.
+ */
+ public static ThreadFactory getThreadFactory(String nameFormat, boolean daemon) {
+ ThreadFactory threadFactory = MoreExecutors.platformThreadFactory();
+ return new ThreadFactoryBuilder()
+ .setThreadFactory(threadFactory)
+ .setDaemon(daemon)
+ .setNameFormat(nameFormat)
+ .build();
+ }
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/LogExceptionRunnable.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/LogExceptionRunnable.java
new file mode 100644
index 0000000..bf2ddac
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/LogExceptionRunnable.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.base.Throwables;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * A simple wrapper for a {@link Runnable} that logs any exception thrown by it, before
+ * re-throwing it.
+ */
+@Slf4j
+public final class LogExceptionRunnable implements Runnable {
+
+ private final Runnable task;
+
+ public LogExceptionRunnable(Runnable task) {
+ this.task = checkNotNull(task);
+ }
+
+ @Override
+ public void run() {
+ try {
+ task.run();
+ } catch (Throwable t) {
+ log.error("Exception while executing runnable " + task, t);
+ Throwables.throwIfUnchecked(t);
+ throw new AssertionError(t);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "LogExceptionRunnable(" + task + ")";
+ }
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ReferenceCounted.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ReferenceCounted.java
new file mode 100644
index 0000000..0e17a10
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/ReferenceCounted.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.bookkeeper.common.util;
+
+/**
+ * An interface for indicating an object is reference counted.
+ */
+public interface ReferenceCounted {
+
+ /**
+ * Retain the reference.
+ */
+ void retain();
+
+ /**
+ * Release the reference.
+ */
+ void release();
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SharedResourceManager.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SharedResourceManager.java
new file mode 100644
index 0000000..b1b67b2
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SharedResourceManager.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.IdentityHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+/**
+ * A holder for shared resource singletons.
+ *
+ * <p>Components like clients and servers need certain resources, e.g. a scheduler,
+ * to run. If the user has not provided such resources, these components will use
+ * a default one, which is shared as a static resource. This class holds these default
+ * resources and manages their lifecycles.
+ *
+ * <p>A resource is identified by the reference of a {@link Resource} object, which
+ * is typically a singleton, provided to the get() and release() methods. Each resource
+ * object (not its class) maps to an object cached in the holder.
+ *
+ * <p>Resources are ref-counted and shut down after a delay when the refcount reaches zero.
+ */
+public class SharedResourceManager {
+
+ static final long DESTROY_DELAY_SECONDS = 1;
+
+ private static final SharedResourceManager SHARED = create();
+
+ public static SharedResourceManager shared() {
+ return SHARED;
+ }
+
+ public static SharedResourceManager create() {
+ return create(() -> Executors.newSingleThreadScheduledExecutor(
+ ExecutorUtils.getThreadFactory("bookkeeper-shared-destroyer-%d", true)));
+ }
+
+ public static SharedResourceManager create(Supplier<ScheduledExecutorService> destroyerFactory) {
+ return new SharedResourceManager(destroyerFactory);
+ }
+
+ /**
+ * Defines a resource, and the way to create and destroy instances of it.
+ *
+ * @param <T> resource type.
+ */
+ public interface Resource<T> {
+ /**
+ * Create a new instance of the resource.
+ *
+ * @return a new instance of the resource.
+ */
+ T create();
+
+ /**
+ * Destroy the given instance.
+ *
+ * @param instance the instance to destroy.
+ */
+ void close(T instance);
+ }
+
+ private static class Instance implements ReferenceCounted {
+
+ private final Object instance;
+ private int refCount;
+ ScheduledFuture<?> destroyTask;
+
+ Instance(Object instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public void retain() {
+ ++refCount;
+ }
+
+ @Override
+ public void release() {
+ --refCount;
+ }
+
+ void cancelDestroyTask() {
+ if (null != destroyTask) {
+ destroyTask.cancel(false);
+ destroyTask = null;
+ }
+ }
+
+ }
+
+ private final IdentityHashMap<Resource<?>, Instance> instances =
+ new IdentityHashMap<>();
+ private final Supplier<ScheduledExecutorService> destroyerFactory;
+ private ScheduledExecutorService destroyer;
+
+ private SharedResourceManager(Supplier<ScheduledExecutorService> destroyerFactory) {
+ this.destroyerFactory = destroyerFactory;
+ }
+
+ @SuppressWarnings("unchecked")
+ public synchronized <T> T get(Resource<T> resource) {
+ Instance instance = instances.get(resource);
+ if (null == instance) {
+ instance = new Instance(resource.create());
+ instances.put(resource, instance);
+ }
+ instance.cancelDestroyTask();
+ instance.retain();
+ return (T) instance.instance;
+ }
+
+ public synchronized <T> void release(final Resource<T> resource,
+ final T instance) {
+ final Instance cached = instances.get(resource);
+ checkArgument(null != cached, "No cached instance found for %s", resource);
+ checkArgument(instance == cached.instance, "Release the wrong instance for %s", resource);
+ checkState(cached.refCount > 0, "Refcount has already reached zero for %s", resource);
+ cached.release();
+ if (0 == cached.refCount) {
+ checkState(null == cached.destroyTask, "Destroy task already scheduled for %s", resource);
+ if (null == destroyer) {
+ destroyer = destroyerFactory.get();
+ }
+ cached.destroyTask = destroyer.schedule(new LogExceptionRunnable(() -> {
+ synchronized (SharedResourceManager.this) {
+ // Refcount may have gone up since the task was scheduled. Re-check it.
+ if (cached.refCount == 0) {
+ resource.close(instance);
+ instances.remove(resource);
+ if (instances.isEmpty()) {
+ destroyer.shutdown();
+ destroyer = null;
+ }
+ }
+ }
+ }), DESTROY_DELAY_SECONDS, TimeUnit.SECONDS);
+ }
+ }
+
+
+}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/package-info.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/package-info.java
new file mode 100644
index 0000000..33c6cd9
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Util functions used across the project.
+ */
+package org.apache.bookkeeper.common.util;
\ No newline at end of file
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSharedResourceManager.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSharedResourceManager.java
new file mode 100644
index 0000000..fab622f
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSharedResourceManager.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.LinkedList;
+import java.util.concurrent.Delayed;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.bookkeeper.common.util.SharedResourceManager.Resource;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Unit test for {@link SharedResourceManager}.
+ */
+public class TestSharedResourceManager {
+
+ private final LinkedList<MockScheduledFuture<?>> scheduledDestroyTasks =
+ new LinkedList<>();
+
+ private SharedResourceManager manager;
+
+ private static class ResourceInstance {
+ volatile boolean closed;
+ }
+
+ private static class ResourceFactory implements Resource<ResourceInstance> {
+ @Override
+ public ResourceInstance create() {
+ return new ResourceInstance();
+ }
+
+ @Override
+ public void close(ResourceInstance instance) {
+ instance.closed = true;
+ }
+ }
+
+ // Defines two kinds of resources
+ private static final Resource<ResourceInstance> SHARED_FOO = new ResourceFactory();
+ private static final Resource<ResourceInstance> SHARED_BAR = new ResourceFactory();
+
+ @Before
+ public void setUp() {
+ manager = SharedResourceManager.create(new MockExecutorFactory());
+ }
+
+ @Test
+ public void destroyResourceWhenRefCountReachesZero() {
+ ResourceInstance foo1 = manager.get(SHARED_FOO);
+ ResourceInstance sharedFoo = foo1;
+ ResourceInstance foo2 = manager.get(SHARED_FOO);
+ assertSame(sharedFoo, foo2);
+
+ ResourceInstance bar1 = manager.get(SHARED_BAR);
+ ResourceInstance sharedBar = bar1;
+
+ manager.release(SHARED_FOO, foo2);
+ // foo refcount not reached 0, thus shared foo is not closed
+ assertTrue(scheduledDestroyTasks.isEmpty());
+ assertFalse(sharedFoo.closed);
+
+ manager.release(SHARED_FOO, foo1);
+
+ // foo refcount has reached 0, a destroying task is scheduled
+ assertEquals(1, scheduledDestroyTasks.size());
+ MockScheduledFuture<?> scheduledDestroyTask = scheduledDestroyTasks.poll();
+ assertEquals(SharedResourceManager.DESTROY_DELAY_SECONDS,
+ scheduledDestroyTask.getDelay(TimeUnit.SECONDS));
+
+ // Simluate that the destroyer executes the foo destroying task
+ scheduledDestroyTask.runTask();
+ assertTrue(sharedFoo.closed);
+
+ // After the destroying, obtaining a foo will get a different instance
+ ResourceInstance foo3 = manager.get(SHARED_FOO);
+ assertNotSame(sharedFoo, foo3);
+
+ manager.release(SHARED_BAR, bar1);
+
+ // bar refcount has reached 0, a destroying task is scheduled
+ assertEquals(1, scheduledDestroyTasks.size());
+ scheduledDestroyTask = scheduledDestroyTasks.poll();
+ assertEquals(SharedResourceManager.DESTROY_DELAY_SECONDS,
+ scheduledDestroyTask.getDelay(TimeUnit.SECONDS));
+
+ // Simulate that the destroyer executes the bar destroying task
+ scheduledDestroyTask.runTask();
+ assertTrue(sharedBar.closed);
+ }
+
+ @Test
+ public void cancelDestroyTask() {
+ ResourceInstance foo1 = manager.get(SHARED_FOO);
+ ResourceInstance sharedFoo = foo1;
+ manager.release(SHARED_FOO, foo1);
+ // A destroying task for foo is scheduled
+ MockScheduledFuture<?> scheduledDestroyTask = scheduledDestroyTasks.poll();
+ assertFalse(scheduledDestroyTask.cancelled);
+
+ // obtaining a foo before the destroying task is executed will cancel the destroy
+ ResourceInstance foo2 = manager.get(SHARED_FOO);
+ assertTrue(scheduledDestroyTask.cancelled);
+ assertTrue(scheduledDestroyTasks.isEmpty());
+ assertFalse(sharedFoo.closed);
+
+ // And it will be the same foo instance
+ assertSame(sharedFoo, foo2);
+
+ // Release it and the destroying task is scheduled again
+ manager.release(SHARED_FOO, foo2);
+ scheduledDestroyTask = scheduledDestroyTasks.poll();
+ assertFalse(scheduledDestroyTask.cancelled);
+ scheduledDestroyTask.runTask();
+ assertTrue(sharedFoo.closed);
+ }
+
+ @Test
+ public void releaseWrongInstance() {
+ ResourceInstance uncached = new ResourceInstance();
+ try {
+ manager.release(SHARED_FOO, uncached);
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ ResourceInstance cached = manager.get(SHARED_FOO);
+ try {
+ manager.release(SHARED_FOO, uncached);
+ fail("Should throw IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ // expected
+ }
+ manager.release(SHARED_FOO, cached);
+ }
+
+ @Test
+ public void overreleaseInstance() {
+ ResourceInstance foo1 = manager.get(SHARED_FOO);
+ manager.release(SHARED_FOO, foo1);
+ try {
+ manager.release(SHARED_FOO, foo1);
+ fail("Should throw IllegalStateException");
+ } catch (IllegalStateException e) {
+ // expected
+ }
+ }
+
+ private class MockExecutorFactory implements Supplier<ScheduledExecutorService> {
+ @Override
+ public ScheduledExecutorService get() {
+ ScheduledExecutorService mockExecutor = mock(ScheduledExecutorService.class);
+ when(mockExecutor.schedule(any(Runnable.class), anyLong(), any(TimeUnit.class))).thenAnswer(
+ (Answer<MockScheduledFuture<Void>>) invocation -> {
+ Object[] args = invocation.getArguments();
+ Runnable command = (Runnable) args[0];
+ long delay = (Long) args[1];
+ TimeUnit unit = (TimeUnit) args[2];
+ MockScheduledFuture<Void> future = new MockScheduledFuture<Void>(
+ command, delay, unit);
+ scheduledDestroyTasks.add(future);
+ return future;
+ });
+ return mockExecutor;
+ }
+ }
+
+ private static class MockScheduledFuture<V> implements ScheduledFuture<V> {
+ private boolean cancelled;
+ private boolean finished;
+ final Runnable command;
+ final long delay;
+ final TimeUnit unit;
+
+ MockScheduledFuture(Runnable command, long delay, TimeUnit unit) {
+ this.command = command;
+ this.delay = delay;
+ this.unit = unit;
+ }
+
+ void runTask() {
+ command.run();
+ finished = true;
+ }
+
+ @Override
+ public boolean cancel(boolean interrupt) {
+ if (cancelled || finished) {
+ return false;
+ }
+ cancelled = true;
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ @Override
+ public long getDelay(TimeUnit targetUnit) {
+ return targetUnit.convert(this.delay, this.unit);
+ }
+
+ @Override
+ public int compareTo(Delayed o) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDone() {
+ return cancelled || finished;
+ }
+
+ @Override
+ public V get() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public V get(long timeout, TimeUnit unit) {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+}
--
To stop receiving notification emails like this one, please contact
['"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>'].