You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/09 01:28:03 UTC
[bookkeeper] 01/03: ISSUE #326: Replace observer/observable with a
simplified watcher/watchable implementation
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit 18b87b569c77da2d52ad8651e4a992be9cf5b0d7
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Dec 15 00:43:44 2017 -0800
ISSUE #326: Replace observer/observable with a simplified watcher/watchable implementation
Descriptions of the changes in this PR:
- long poll only need one-time notification on lac updates
- replace observer/observable with a simplified watcher/watchable implementation. watchers are removed after they are fired.
- add `RecycableHashSet` for `Watchable` keeping the list of watchers
- object pooling on `Watchable` and `LastAddConfirmedUpdateNotification`
Author: Sijie Guo <si...@apache.org>
Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>
This closes #838 from sijie/rxjava, closes #326
---
.travis.yml | 3 +
bookkeeper-common/pom.xml | 5 +
.../common/collections/RecyclableArrayList.java | 57 +++++++++
.../common/collections/package-info.java | 19 +--
.../apache/bookkeeper/common/util/Recyclable.java | 24 ++--
.../apache/bookkeeper/common/util/Watchable.java | 120 +++++++++++++++++++
.../org/apache/bookkeeper/common/util/Watcher.java | 30 ++---
.../collections/RecyclableArrayListTest.java | 38 +++---
.../bookkeeper/common/util/TestWatchable.java | 129 +++++++++++++++++++++
.../src/test/resources/log4j.properties | 42 +++++++
.../java/org/apache/bookkeeper/bookie/Bookie.java | 9 +-
.../org/apache/bookkeeper/bookie/FileInfo.java | 72 ++++++++----
.../bookkeeper/bookie/IndexPersistenceMgr.java | 11 +-
.../bookie/InterleavedLedgerStorage.java | 9 +-
.../java/org/apache/bookkeeper/bookie/Journal.java | 1 +
.../bookie/LastAddConfirmedUpdateNotification.java | 49 +++++++-
.../org/apache/bookkeeper/bookie/LedgerCache.java | 7 +-
.../apache/bookkeeper/bookie/LedgerCacheImpl.java | 10 +-
.../apache/bookkeeper/bookie/LedgerDescriptor.java | 7 +-
.../bookkeeper/bookie/LedgerDescriptorImpl.java | 8 +-
.../apache/bookkeeper/bookie/LedgerStorage.java | 11 +-
.../proto/LongPollReadEntryProcessorV3.java | 43 +++----
.../bookkeeper/bookie/IndexPersistenceMgrTest.java | 38 +++---
.../LastAddConfirmedUpdateNotificationTest.java | 62 ++++++++++
.../apache/bookkeeper/bookie/TestSyncThread.java | 17 +--
.../org/apache/bookkeeper/meta/GcLedgersTest.java | 12 +-
.../bookkeeper/meta/LedgerManagerTestCase.java | 11 +-
27 files changed, 663 insertions(+), 181 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index bea17d5..718d388 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,6 +27,9 @@ matrix:
osx_image: xcode8
- os: linux
env: CUSTOM_JDK="oraclejdk8"
+ - os: linux
+ dist: trusty
+ env: CUSTOM_JDK="openjdk8"
before_install:
- echo "MAVEN_OPTS='-Xmx3072m -XX:MaxPermSize=512m'" > ~/.mavenrc
diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 1e502ca..5398664 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -36,6 +36,11 @@
<version>${guava.version}</version>
</dependency>
<dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-common</artifactId>
+ <version>${netty.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>${google.code.version}</version>
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
new file mode 100644
index 0000000..7dd663f
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
@@ -0,0 +1,57 @@
+// Originally copied from netty project, version 4.1.17-Final, heavily modified
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.collections;
+
+import io.netty.util.Recycler.Handle;
+import java.util.ArrayList;
+
+/**
+ * A simple list which is recyclable.
+ */
+public final class RecyclableArrayList<T> extends ArrayList<T> {
+
+ private static final int DEFAULT_INITIAL_CAPACITY = 8;
+
+ /**
+ * An ArrayList recycler.
+ */
+ public static class Recycler<X>
+ extends io.netty.util.Recycler<RecyclableArrayList<X>> {
+ @Override
+ protected RecyclableArrayList<X> newObject(
+ Handle<RecyclableArrayList<X>> handle) {
+ return new RecyclableArrayList<X>(handle, DEFAULT_INITIAL_CAPACITY);
+ }
+
+ public RecyclableArrayList<X> newInstance() {
+ return get();
+ }
+ }
+
+ private final Handle<RecyclableArrayList<T>> handle;
+
+ private RecyclableArrayList(Handle<RecyclableArrayList<T>> handle, int initialCapacity) {
+ super(initialCapacity);
+ this.handle = handle;
+ }
+
+ public void recycle() {
+ clear();
+ handle.recycle(this);
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
similarity index 58%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
index a0c112d..0ed3c73 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +15,9 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.bookie;
/**
- * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
+ * Bookkeeper common collections.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
-
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
- }
-}
+package org.apache.bookkeeper.common.collections;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
similarity index 59%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
index a0c112d..edbaa34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +15,18 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.bookie;
+
+package org.apache.bookkeeper.common.util;
/**
- * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
+ * An interface represents an object that is recyclable.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
+public interface Recyclable {
+
+ /**
+ * Recycle the instance.
+ */
+ void recycle();
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
- }
}
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
new file mode 100644
index 0000000..a74f5f5
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.function.Function;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+
+/**
+ * This class represents an watchable object, or "data"
+ * in the model-view paradigm. It can be subclassed to represent an
+ * object that the application wants to have watched.
+ *
+ * <p>An watchable object can have one or more watchers. An watcher
+ * may be any object that implements interface <tt>Watcher</tt>. After an
+ * watchable instance changes, an application calling the
+ * <code>Watchable</code>'s <code>notifyWatchers</code> method
+ * causes all of its watchers to be notified of the change by a call
+ * to their <code>update</code> method.
+ *
+ * <p>A watcher is automatically removed from the watchers list once an event
+ * is fired to the watcher.
+ *
+ * <p>Note that this notification mechanism has nothing to do with threads
+ * and is completely separate from the <tt>wait</tt> and <tt>notify</tt>
+ * mechanism of class <tt>Object</tt>.
+ *
+ * <p>When an watchable object is newly created, its set of watchers is
+ * empty. If a same watcher is added multiple times to this watchable, it will
+ * receive the notifications multiple times.
+ */
+public class Watchable<T> implements Recyclable {
+
+ private final Recycler<Watcher<T>> recycler;
+ private RecyclableArrayList<Watcher<T>> watchers;
+
+ /** Construct an Watchable with zero watchers. */
+
+ public Watchable(Recycler<Watcher<T>> recycler) {
+ this.recycler = recycler;
+ this.watchers = recycler.newInstance();
+ }
+
+ synchronized int getNumWatchers() {
+ return this.watchers.size();
+ }
+
+ /**
+ * Adds an watcher to the set of watchers for this object, provided
+ * that it is not the same as some watcher already in the set.
+ * The order in which notifications will be delivered to multiple
+ * watchers is not specified. See the class comment.
+ *
+ * @param w an watcher to be added.
+ * @return true if a watcher is added to the list successfully, otherwise false.
+ * @throws NullPointerException if the parameter o is null.
+ */
+ public synchronized boolean addWatcher(Watcher<T> w) {
+ checkNotNull(w, "Null watcher is provided");
+ return watchers.add(w);
+ }
+
+ /**
+ * Deletes an watcher from the set of watcher of this object.
+ * Passing <CODE>null</CODE> to this method will have no effect.
+ * @param w the watcher to be deleted.
+ */
+ public synchronized boolean deleteWatcher(Watcher<T> w) {
+ return watchers.remove(w);
+ }
+
+ /**
+ * Notify the watchers with the update <i>value</i>.
+ *
+ * @param value value to notify
+ */
+ public <R> void notifyWatchers(Function<R, T> valueFn, R value) {
+ RecyclableArrayList<Watcher<T>> watchersLocal;
+ synchronized (this) {
+ watchersLocal = watchers;
+ watchers = recycler.newInstance();
+ }
+
+ for (Watcher<T> watcher : watchersLocal) {
+ watcher.update(valueFn.apply(value));
+ }
+ watchersLocal.recycle();
+ }
+
+ /**
+ * Clears the watcher list so that this object no longer has any watchers.
+ */
+ public synchronized void deleteWatchers() {
+ watchers.clear();
+ }
+
+ @Override
+ public synchronized void recycle() {
+ watchers.recycle();
+ }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
similarity index 59%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
index a0c112d..220a942 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +15,24 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.bookie;
+
+package org.apache.bookkeeper.common.util;
/**
- * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
+ * A class can implement the <code>Watcher</code> interface when it
+ * wants to be informed of <i>one-time</i> changes in watchable objects.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
+public interface Watcher<T> {
+
+ /**
+ * This method is called whenever the watched object is changed. An
+ * application calls an <tt>Watchable</tt> object's
+ * <code>notifyWatchers</code> method to have all the object's
+ * watchers notified of the change.
+ *
+ * @param value the updated value of a watchable
+ */
+ void update(T value);
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
- }
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
similarity index 53%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
index a0c112d..9037d21 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -16,21 +15,34 @@
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
*/
-package org.apache.bookkeeper.bookie;
+package org.apache.bookkeeper.common.collections;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+import org.junit.Test;
/**
- * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
+ * Unit test of {@link RecyclableArrayList}.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
+public class RecyclableArrayListTest {
+
+ private final Recycler<Integer> recycler;
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
+ public RecyclableArrayListTest() {
+ this.recycler = new Recycler<>();
}
+
+ @Test
+ public void testRecycle() {
+ RecyclableArrayList<Integer> array = recycler.newInstance();
+ for (int i = 0; i < 5; i++) {
+ array.add(i);
+ }
+ assertEquals(5, array.size());
+ array.recycle();
+ assertEquals(0, array.size());
+ }
+
}
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
new file mode 100644
index 0000000..697d0da
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.function.Function;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link Watchable}.
+ */
+public class TestWatchable {
+
+ private final Recycler<Watcher<Integer>> recycler;
+ private final Watchable<Integer> watchable;
+
+ public TestWatchable() {
+ this.recycler = new Recycler<>();
+ this.watchable = new Watchable<>(recycler);
+ }
+
+ @After
+ public void teardown() {
+ this.watchable.recycle();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAddWatcher() {
+ Watcher<Integer> watcher = mock(Watcher.class);
+ assertTrue(watchable.addWatcher(watcher));
+ assertEquals(1, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher, times(1)).update(eq(123));
+
+ // after the watcher is fired, watcher should be removed from watcher list.
+ assertEquals(0, watchable.getNumWatchers());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testDeleteWatcher() {
+ Watcher<Integer> watcher = mock(Watcher.class);
+ assertTrue(watchable.addWatcher(watcher));
+ assertEquals(1, watchable.getNumWatchers());
+ assertTrue(watchable.deleteWatcher(watcher));
+ assertEquals(0, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher, times(0)).update(anyInt());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testMultipleWatchers() {
+ Watcher<Integer> watcher1 = mock(Watcher.class);
+ Watcher<Integer> watcher2 = mock(Watcher.class);
+
+ assertTrue(watchable.addWatcher(watcher1));
+ assertTrue(watchable.addWatcher(watcher2));
+ assertEquals(2, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher1, times(1)).update(eq(123));
+ verify(watcher2, times(1)).update(eq(123));
+ assertEquals(0, watchable.getNumWatchers());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAddWatchMultipleTimes() {
+ Watcher<Integer> watcher = mock(Watcher.class);
+
+ int numTimes = 3;
+ for (int i = 0; i < numTimes; i++) {
+ assertTrue(watchable.addWatcher(watcher));
+ }
+ assertEquals(numTimes, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher, times(numTimes)).update(eq(123));
+ assertEquals(0, watchable.getNumWatchers());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testDeleteWatchers() {
+ Watcher<Integer> watcher1 = mock(Watcher.class);
+ Watcher<Integer> watcher2 = mock(Watcher.class);
+
+ assertTrue(watchable.addWatcher(watcher1));
+ assertTrue(watchable.addWatcher(watcher2));
+ assertEquals(2, watchable.getNumWatchers());
+ watchable.deleteWatchers();
+ assertEquals(0, watchable.getNumWatchers());
+
+ watchable.notifyWatchers(Function.identity(), 123);
+ verify(watcher1, times(0)).update(anyInt());
+ verify(watcher2, times(0)).update(anyInt());
+ }
+
+}
diff --git a/bookkeeper-common/src/test/resources/log4j.properties b/bookkeeper-common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..10ae6bf
--- /dev/null
+++ b/bookkeeper-common/src/test/resources/log4j.properties
@@ -0,0 +1,42 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only, level INFO
+bookkeeper.root.logger=INFO,CONSOLE
+log4j.rootLogger=${bookkeeper.root.logger}
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.apache.bookkeeper.bookie=INFO
+log4j.logger.org.apache.bookkeeper.meta=INFO
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index c08ff46..074c29c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -53,8 +53,6 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -73,6 +71,7 @@ import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
import org.apache.bookkeeper.bookie.Journal.JournalScanner;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.discover.ZKRegistrationManager;
@@ -1433,10 +1432,12 @@ public class Bookie extends BookieCriticalThread {
return handle.getLastAddConfirmed();
}
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
- return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+ return handle.waitForLastAddConfirmedUpdate(previousLAC, watcher);
}
// The rest of the code is test stuff
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index c14be5f..38edbda 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -22,6 +22,7 @@
package org.apache.bookkeeper.bookie;
import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
@@ -32,9 +33,9 @@ import java.io.RandomAccessFile;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.common.util.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
* in entry loggers.
* </p>
*/
-class FileInfo extends Observable {
+class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
private static final Logger LOG = LoggerFactory.getLogger(FileInfo.class);
static final int NO_MASTER_KEY = -1;
@@ -93,8 +94,9 @@ class FileInfo extends Observable {
protected String mode;
public FileInfo(File lf, byte[] masterKey) throws IOException {
- this.lf = lf;
+ super(WATCHER_RECYCLER);
+ this.lf = lf;
this.masterKey = masterKey;
mode = "rw";
}
@@ -105,10 +107,11 @@ class FileInfo extends Observable {
long setLastAddConfirmed(long lac) {
long lacToReturn;
+ boolean changed = false;
synchronized (this) {
if (null == this.lac || this.lac < lac) {
this.lac = lac;
- setChanged();
+ changed = true;
}
lacToReturn = this.lac;
}
@@ -116,22 +119,24 @@ class FileInfo extends Observable {
LOG.trace("Updating LAC {} , {}", lacToReturn, lac);
}
-
- notifyObservers(new LastAddConfirmedUpdateNotification(lacToReturn));
+ if (changed) {
+ notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lacToReturn);
+ }
return lacToReturn;
}
- synchronized Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer observe) {
+ synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) {
if ((null != lac && lac > previousLAC)
|| isClosed || ((stateBits & STATE_FENCED_BIT) == STATE_FENCED_BIT)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Wait For LAC {} , {}", this.lac, previousLAC);
}
- return null;
+ return false;
}
- addObserver(observe);
- return this;
+ addWatcher(watcher);
+ return true;
}
public synchronized File getLf() {
@@ -283,6 +288,7 @@ class FileInfo extends Observable {
*/
public boolean setFenced() throws IOException {
boolean returnVal = false;
+ boolean changed = false;
synchronized (this) {
checkOpen(false);
if (LOG.isDebugEnabled()) {
@@ -293,12 +299,14 @@ class FileInfo extends Observable {
stateBits |= STATE_FENCED_BIT;
needFlushHeader = true;
synchronized (this) {
- setChanged();
+ changed = true;
}
returnVal = true;
}
}
- notifyObservers(new LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
+ if (changed) {
+ notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, Long.MAX_VALUE);
+ }
return returnVal;
}
@@ -379,20 +387,36 @@ class FileInfo extends Observable {
* if set to false, the index is not forced to create.
*/
public void close(boolean force) throws IOException {
- synchronized (this) {
- isClosed = true;
- checkOpen(force, true);
- // Any time when we force close a file, we should try to flush header. otherwise, we might lose fence bit.
- if (force) {
- flushHeader();
+ boolean closing = false;
+ try {
+ boolean changed = false;
+ synchronized (this) {
+ if (isClosed) {
+ return;
+ }
+ isClosed = true;
+ closing = true;
+ checkOpen(force, true);
+ // Any time when we force close a file, we should try to flush header.
+ // otherwise, we might lose fence bit.
+ if (force) {
+ flushHeader();
+ }
+ changed = true;
+ if (useCount.get() == 0 && fc != null) {
+ fc.close();
+ fc = null;
+ }
}
- setChanged();
- if (useCount.get() == 0 && fc != null) {
- fc.close();
- fc = null;
+ if (changed) {
+ notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, Long.MAX_VALUE);
+ }
+ } finally {
+ if (closing) {
+ // recycle this watchable after the FileInfo is closed.
+ recycle();
}
}
- notifyObservers(new LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
}
public synchronized long write(ByteBuffer[] buffs, long position) throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 3120fa4..8598bc4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -31,7 +31,6 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.UncheckedExecutionException;
-
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
@@ -40,17 +39,15 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
@@ -435,11 +432,13 @@ public class IndexPersistenceMgr {
}
}
- Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException {
+ boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
FileInfo fi = null;
try {
fi = getFileInfo(ledgerId, null);
- return fi.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+ return fi.waitForLastAddConfirmedUpdate(previousLAC, watcher);
} finally {
if (null != fi) {
fi.release();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6db3e6d..b654b0e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -32,14 +32,13 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -251,9 +250,11 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
- return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer);
+ return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1a3635c..11468d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.stats.Counter;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
index a0c112d..71cbd61 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
@@ -20,17 +20,54 @@
*/
package org.apache.bookkeeper.bookie;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.util.function.Function;
+import lombok.Getter;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.util.Recyclable;
+import org.apache.bookkeeper.common.util.Watcher;
+
/**
* A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
*
* <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
*/
-public class LastAddConfirmedUpdateNotification {
- public long lastAddConfirmed;
- public long timestamp;
+@Getter
+public class LastAddConfirmedUpdateNotification implements Recyclable {
+
+ public static final Function<Long, LastAddConfirmedUpdateNotification> FUNC = lac -> of(lac);
+
+ public static final RecyclableArrayList.Recycler<Watcher<LastAddConfirmedUpdateNotification>> WATCHER_RECYCLER =
+ new RecyclableArrayList.Recycler<>();
+
+ public static LastAddConfirmedUpdateNotification of(long lastAddConfirmed) {
+ LastAddConfirmedUpdateNotification lac = RECYCLER.get();
+ lac.lastAddConfirmed = lastAddConfirmed;
+ lac.timestamp = System.currentTimeMillis();
+ return lac;
+ }
+
+ private static final Recycler<LastAddConfirmedUpdateNotification> RECYCLER =
+ new Recycler<LastAddConfirmedUpdateNotification>() {
+ @Override
+ protected LastAddConfirmedUpdateNotification newObject(Handle<LastAddConfirmedUpdateNotification> handle) {
+ return new LastAddConfirmedUpdateNotification(handle);
+ }
+ };
+
+ private final Handle<LastAddConfirmedUpdateNotification> handle;
+ private long lastAddConfirmed;
+ private long timestamp;
+
+ public LastAddConfirmedUpdateNotification(Handle<LastAddConfirmedUpdateNotification> handle) {
+ this.handle = handle;
+ }
- public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
- this.lastAddConfirmed = lastAddConfirmed;
- this.timestamp = System.currentTimeMillis();
+ @Override
+ public void recycle() {
+ this.lastAddConfirmed = -1L;
+ this.timestamp = -1L;
+ handle.recycle(this);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 26d5245..14d4825 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -24,8 +24,7 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
/**
* This class maps a ledger entry number into a location (entrylogid, offset) in
@@ -49,7 +48,9 @@ interface LedgerCache extends Closeable {
Long getLastAddConfirmed(long ledgerId) throws IOException;
long updateLastAddConfirmed(long ledgerId, long lac) throws IOException;
- Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException;
+ boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
void deleteLedger(long ledgerId) throws IOException;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 0b63e32..1db7d47 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -23,8 +23,7 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -85,12 +84,13 @@ public class LedgerCacheImpl implements LedgerCache {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer)
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
throws IOException {
- return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer);
+ return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
-
@Override
public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
indexPageManager.putEntryOffset(ledger, entry, offset);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 970cec0..f4db7a6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -27,8 +27,7 @@ import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
/**
* Implements a ledger inside a bookie. In particular, it implements operations
@@ -78,7 +77,9 @@ public abstract class LedgerDescriptor {
abstract ByteBuf readEntry(long entryId) throws IOException;
abstract long getLastAddConfirmed() throws IOException;
- abstract Observable waitForLastAddConfirmedUpdate(long previoisLAC, Observer observer) throws IOException;
+ abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException;
abstract void setExplicitLac(ByteBuf entry) throws IOException;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index af55f6a..d126c4c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -25,9 +25,8 @@ import com.google.common.util.concurrent.SettableFuture;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.Arrays;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.common.util.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -156,7 +155,8 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
}
@Override
- Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer observer) throws IOException {
- return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, observer);
+ boolean waitForLastAddConfirmedUpdate(long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+ return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
}
}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index d2055a8..3e2f7dc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -23,9 +23,8 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -119,12 +118,14 @@ public interface LedgerStorage {
/**
* Wait for last add confirmed update.
*
- * @param previoisLAC - The threshold beyond which we would wait for the update
- * @param observer - Observer to notify on update
+ * @param previousLAC - The threshold beyond which we would wait for the update
+ * @param watcher - Watcher to notify on update
* @return
* @throws IOException
*/
- Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException;
+ boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
/**
* Flushes all data in the storage. Once this is called,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index 342e788..f60dac7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -22,16 +22,14 @@ import com.google.common.base.Stopwatch;
import io.netty.channel.Channel;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
@@ -41,7 +39,7 @@ import org.slf4j.LoggerFactory;
/**
* Processor handling long poll read entry request.
*/
-class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Observer {
+class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watcher<LastAddConfirmedUpdateNotification> {
private final static Logger logger = LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
@@ -141,9 +139,9 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser
final Stopwatch startTimeSw = Stopwatch.createStarted();
- final Observable observable;
+ final boolean watched;
try {
- observable = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
+ watched = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
} catch (Bookie.NoLedgerException e) {
logger.info("No ledger found while longpoll reading ledger {}, previous lac = {}.",
ledgerId, previousLAC);
@@ -157,19 +155,16 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser
registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, startTimeSw);
lastPhaseStartTime.reset().start();
- if (null != observable) {
- // successfully registered observable to lac updates
+ if (watched) {
+ // successfully registered watcher to lac updates
if (logger.isTraceEnabled()) {
logger.trace("Waiting For LAC Update {}: Timeout {}", previousLAC, readRequest.getTimeOut());
}
synchronized (this) {
- expirationTimerTask = requestTimer.newTimeout(new TimerTask() {
- @Override
- public void run(Timeout timeout) throws Exception {
- // When the timeout expires just get whatever is the current
- // readLastConfirmed
- LongPollReadEntryProcessorV3.this.scheduleDeferredRead(observable, true);
- }
+ expirationTimerTask = requestTimer.newTimeout(timeout -> {
+ // When the timeout expires just get whatever is the current
+ // readLastConfirmed
+ LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
}, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
}
return null;
@@ -188,27 +183,25 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser
}
@Override
- public void update(Observable observable, Object o) {
- LastAddConfirmedUpdateNotification newLACNotification = (LastAddConfirmedUpdateNotification)o;
- if (newLACNotification.lastAddConfirmed > previousLAC) {
- if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE &&
- !lastAddConfirmedUpdateTime.isPresent()) {
- lastAddConfirmedUpdateTime = Optional.of(newLACNotification.timestamp);
+ public void update(LastAddConfirmedUpdateNotification newLACNotification) {
+ if (newLACNotification.getLastAddConfirmed() > previousLAC) {
+ if (newLACNotification.getLastAddConfirmed() != Long.MAX_VALUE && !lastAddConfirmedUpdateTime.isPresent()) {
+ lastAddConfirmedUpdateTime = Optional.of(newLACNotification.getTimestamp());
}
if (logger.isTraceEnabled()) {
logger.trace("Last Add Confirmed Advanced to {} for request {}",
- newLACNotification.lastAddConfirmed, request);
+ newLACNotification.getLastAddConfirmed(), request);
}
- scheduleDeferredRead(observable, false);
+ scheduleDeferredRead(false);
}
+ newLACNotification.recycle();
}
- private synchronized void scheduleDeferredRead(Observable observable, boolean timeout) {
+ private synchronized void scheduleDeferredRead(boolean timeout) {
if (null == deferredTask) {
if (logger.isTraceEnabled()) {
logger.trace("Deferred Task, expired: {}, request: {}", timeout, request);
}
- observable.deleteObserver(this);
try {
shouldReadEntry = true;
deferredTask = longPollThreadPool.submit(this);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 1c63d39..a179745 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -20,6 +20,15 @@
*/
package org.apache.bookkeeper.bookie;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.DiskChecker;
@@ -31,13 +40,6 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.util.Observable;
-import java.util.Observer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
/**
* Test cases for IndexPersistenceMgr
*/
@@ -235,29 +237,27 @@ public class IndexPersistenceMgrTest {
@Test
public void testEvictionShouldNotAffectLongPollRead() throws Exception {
IndexPersistenceMgr indexPersistenceMgr = null;
- Observer observer = (obs, obj) -> {
- //no-ops
- };
+ Watcher<LastAddConfirmedUpdateNotification> watcher = notification -> notification.recycle();
try {
indexPersistenceMgr = createIndexPersistenceManager(1);
indexPersistenceMgr.getFileInfo(lid, masterKey);
indexPersistenceMgr.getFileInfo(lid, null);
indexPersistenceMgr.updateLastAddConfirmed(lid, 1);
- Observable observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
- // observer shouldn't be null because ledger is not evicted or closed
- assertNotNull("Observer should not be null", observable);
+ // watch should succeed because ledger is not evicted or closed
+ assertTrue(
+ indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher));
// now evict ledger 1 from write cache
indexPersistenceMgr.getFileInfo(lid + 1, masterKey);
- observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
- // even if ledger 1 is evicted from write cache, observer still shouldn't be null
- assertNotNull("Observer should not be null", observable);
+ // even if ledger 1 is evicted from write cache, watcher should still succeed
+ assertTrue(
+ indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher));
// now evict ledger 1 from read cache
indexPersistenceMgr.getFileInfo(lid + 2, masterKey);
indexPersistenceMgr.getFileInfo(lid + 2, null);
- // even if ledger 1 is evicted from both cache, observer still shouldn't be null because it
+ // even if ledger 1 is evicted from both cache, watcher should still succeed because it
// will create a new FileInfo when cache miss
- observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
- assertNotNull("Observer should not be null", observable);
+ assertTrue(
+ indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher));
} finally {
if (null != indexPersistenceMgr) {
indexPersistenceMgr.close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
new file mode 100644
index 0000000..90da463
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Unit test of {@link LastAddConfirmedUpdateNotification}.
+ */
+public class LastAddConfirmedUpdateNotificationTest {
+
+ @Test
+ public void testGetters() {
+ long lac = System.currentTimeMillis();
+ LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.of(lac);
+
+ long timestamp = System.currentTimeMillis();
+ assertEquals(lac, notification.getLastAddConfirmed());
+ assertTrue(notification.getTimestamp() <= timestamp);
+
+ notification.recycle();
+ }
+
+ @Test
+ public void testRecycle() {
+ long lac = System.currentTimeMillis();
+ LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.of(lac);
+ notification.recycle();
+
+ assertEquals(-1L, notification.getLastAddConfirmed());
+ assertEquals(-1L, notification.getTimestamp());
+ }
+
+ @Test
+ public void testFunc() {
+ long lac = System.currentTimeMillis();
+ LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.FUNC.apply(lac);
+
+ assertEquals(lac, notification.getLastAddConfirmed());
+ }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 5f30557..d02f067 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -23,8 +23,7 @@ package org.apache.bookkeeper.bookie;
import io.netty.buffer.ByteBuf;
import java.io.File;
import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Callable;
@@ -33,13 +32,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.conf.TestBKConfiguration;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
@@ -341,8 +341,11 @@ public class TestSyncThread {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException {
- return null;
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ return false;
}
@Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index ad21fc1..8cec701 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,7 +21,6 @@
package org.apache.bookkeeper.meta;
-
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -37,8 +36,6 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
@@ -55,11 +52,13 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollector;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
@@ -425,8 +424,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException {
- return null;
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ return false;
}
}
}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 5d357c3..492320c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -27,8 +27,6 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -36,7 +34,9 @@ import org.apache.bookkeeper.bookie.Checkpointer;
import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.common.util.Watcher;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -210,8 +210,11 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
}
@Override
- public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException {
- return null;
+ public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+ long previousLAC,
+ Watcher<LastAddConfirmedUpdateNotification> watcher)
+ throws IOException {
+ return false;
}
@Override
--
To stop receiving notification emails like this one, please contact
"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>.