You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2018/01/09 01:28:03 UTC

[bookkeeper] 01/03: ISSUE #326: Replace observer/observable with a simplified watcher/watchable implementation

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-4.6
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 18b87b569c77da2d52ad8651e4a992be9cf5b0d7
Author: Sijie Guo <si...@apache.org>
AuthorDate: Fri Dec 15 00:43:44 2017 -0800

    ISSUE #326: Replace observer/observable with a simplified watcher/watchable implementation
    
    Descriptions of the changes in this PR:
    
    - long poll only need one-time notification on lac updates
    - replace observer/observable with a simplified watcher/watchable implementation. watchers are removed after they are fired.
    - add `RecycableHashSet` for `Watchable` keeping the list of watchers
    - object pooling on `Watchable` and `LastAddConfirmedUpdateNotification`
    
    Author: Sijie Guo <si...@apache.org>
    
    Reviewers: Ivan Kelly <iv...@apache.org>, Enrico Olivelli <eo...@gmail.com>
    
    This closes #838 from sijie/rxjava, closes #326
---
 .travis.yml                                        |   3 +
 bookkeeper-common/pom.xml                          |   5 +
 .../common/collections/RecyclableArrayList.java    |  57 +++++++++
 .../common/collections/package-info.java           |  19 +--
 .../apache/bookkeeper/common/util/Recyclable.java  |  24 ++--
 .../apache/bookkeeper/common/util/Watchable.java   | 120 +++++++++++++++++++
 .../org/apache/bookkeeper/common/util/Watcher.java |  30 ++---
 .../collections/RecyclableArrayListTest.java       |  38 +++---
 .../bookkeeper/common/util/TestWatchable.java      | 129 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  42 +++++++
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |   9 +-
 .../org/apache/bookkeeper/bookie/FileInfo.java     |  72 ++++++++----
 .../bookkeeper/bookie/IndexPersistenceMgr.java     |  11 +-
 .../bookie/InterleavedLedgerStorage.java           |   9 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |   1 +
 .../bookie/LastAddConfirmedUpdateNotification.java |  49 +++++++-
 .../org/apache/bookkeeper/bookie/LedgerCache.java  |   7 +-
 .../apache/bookkeeper/bookie/LedgerCacheImpl.java  |  10 +-
 .../apache/bookkeeper/bookie/LedgerDescriptor.java |   7 +-
 .../bookkeeper/bookie/LedgerDescriptorImpl.java    |   8 +-
 .../apache/bookkeeper/bookie/LedgerStorage.java    |  11 +-
 .../proto/LongPollReadEntryProcessorV3.java        |  43 +++----
 .../bookkeeper/bookie/IndexPersistenceMgrTest.java |  38 +++---
 .../LastAddConfirmedUpdateNotificationTest.java    |  62 ++++++++++
 .../apache/bookkeeper/bookie/TestSyncThread.java   |  17 +--
 .../org/apache/bookkeeper/meta/GcLedgersTest.java  |  12 +-
 .../bookkeeper/meta/LedgerManagerTestCase.java     |  11 +-
 27 files changed, 663 insertions(+), 181 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index bea17d5..718d388 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -27,6 +27,9 @@ matrix:
       osx_image: xcode8
     - os: linux
       env: CUSTOM_JDK="oraclejdk8"
+    - os: linux
+      dist: trusty
+      env: CUSTOM_JDK="openjdk8"
 
 before_install:
   - echo "MAVEN_OPTS='-Xmx3072m -XX:MaxPermSize=512m'" > ~/.mavenrc
diff --git a/bookkeeper-common/pom.xml b/bookkeeper-common/pom.xml
index 1e502ca..5398664 100644
--- a/bookkeeper-common/pom.xml
+++ b/bookkeeper-common/pom.xml
@@ -36,6 +36,11 @@
       <version>${guava.version}</version>
     </dependency>
     <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-common</artifactId>
+      <version>${netty.version}</version>
+    </dependency>
+    <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>
       <version>${google.code.version}</version>
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
new file mode 100644
index 0000000..7dd663f
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/RecyclableArrayList.java
@@ -0,0 +1,57 @@
+// Originally copied from netty project, version 4.1.17-Final, heavily modified
+/*
+ * Copyright 2013 The Netty Project
+ *
+ * The Netty Project licenses this file to you under the Apache License,
+ * version 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at:
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.collections;
+
+import io.netty.util.Recycler.Handle;
+import java.util.ArrayList;
+
+/**
+ * A simple list which is recyclable.
+ */
+public final class RecyclableArrayList<T> extends ArrayList<T> {
+
+    private static final int DEFAULT_INITIAL_CAPACITY = 8;
+
+    /**
+     * An ArrayList recycler.
+     */
+    public static class Recycler<X>
+        extends io.netty.util.Recycler<RecyclableArrayList<X>> {
+        @Override
+        protected RecyclableArrayList<X> newObject(
+                Handle<RecyclableArrayList<X>> handle) {
+            return new RecyclableArrayList<X>(handle, DEFAULT_INITIAL_CAPACITY);
+        }
+
+        public RecyclableArrayList<X> newInstance() {
+            return get();
+        }
+    }
+
+    private final Handle<RecyclableArrayList<T>> handle;
+
+    private RecyclableArrayList(Handle<RecyclableArrayList<T>> handle, int initialCapacity) {
+        super(initialCapacity);
+        this.handle = handle;
+    }
+
+    public void recycle() {
+        clear();
+        handle.recycle(this);
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
similarity index 58%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
index a0c112d..0ed3c73 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/collections/package-info.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,21 +15,9 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-package org.apache.bookkeeper.bookie;
 
 /**
- * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
+ * Bookkeeper common collections.
  */
-public class LastAddConfirmedUpdateNotification {
-    public long lastAddConfirmed;
-    public long timestamp;
-
-    public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
-        this.lastAddConfirmed = lastAddConfirmed;
-        this.timestamp = System.currentTimeMillis();
-    }
-}
+package org.apache.bookkeeper.common.collections;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
similarity index 59%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
index a0c112d..edbaa34 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Recyclable.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,21 +15,18 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-package org.apache.bookkeeper.bookie;
+
+package org.apache.bookkeeper.common.util;
 
 /**
- * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
+ * An interface represents an object that is recyclable.
  */
-public class LastAddConfirmedUpdateNotification {
-    public long lastAddConfirmed;
-    public long timestamp;
+public interface Recyclable {
+
+    /**
+     * Recycle the instance.
+     */
+    void recycle();
 
-    public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
-        this.lastAddConfirmed = lastAddConfirmed;
-        this.timestamp = System.currentTimeMillis();
-    }
 }
diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
new file mode 100644
index 0000000..a74f5f5
--- /dev/null
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watchable.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.function.Function;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+
+/**
+ * This class represents an watchable object, or "data"
+ * in the model-view paradigm. It can be subclassed to represent an
+ * object that the application wants to have watched.
+ *
+ * <p>An watchable object can have one or more watchers. An watcher
+ * may be any object that implements interface <tt>Watcher</tt>. After an
+ * watchable instance changes, an application calling the
+ * <code>Watchable</code>'s <code>notifyWatchers</code> method
+ * causes all of its watchers to be notified of the change by a call
+ * to their <code>update</code> method.
+ *
+ * <p>A watcher is automatically removed from the watchers list once an event
+ * is fired to the watcher.
+ *
+ * <p>Note that this notification mechanism has nothing to do with threads
+ * and is completely separate from the <tt>wait</tt> and <tt>notify</tt>
+ * mechanism of class <tt>Object</tt>.
+ *
+ * <p>When an watchable object is newly created, its set of watchers is
+ * empty. If a same watcher is added multiple times to this watchable, it will
+ * receive the notifications multiple times.
+ */
+public class Watchable<T> implements Recyclable {
+
+    private final Recycler<Watcher<T>> recycler;
+    private RecyclableArrayList<Watcher<T>> watchers;
+
+    /** Construct an Watchable with zero watchers. */
+
+    public Watchable(Recycler<Watcher<T>> recycler) {
+        this.recycler = recycler;
+        this.watchers = recycler.newInstance();
+    }
+
+    synchronized int getNumWatchers() {
+        return this.watchers.size();
+    }
+
+    /**
+     * Adds an watcher to the set of watchers for this object, provided
+     * that it is not the same as some watcher already in the set.
+     * The order in which notifications will be delivered to multiple
+     * watchers is not specified. See the class comment.
+     *
+     * @param  w an watcher to be added.
+     * @return true if a watcher is added to the list successfully, otherwise false.
+     * @throws NullPointerException   if the parameter o is null.
+     */
+    public synchronized boolean addWatcher(Watcher<T> w) {
+        checkNotNull(w, "Null watcher is provided");
+        return watchers.add(w);
+    }
+
+    /**
+     * Deletes an watcher from the set of watcher of this object.
+     * Passing <CODE>null</CODE> to this method will have no effect.
+     * @param w the watcher to be deleted.
+     */
+    public synchronized boolean deleteWatcher(Watcher<T> w) {
+        return watchers.remove(w);
+    }
+
+    /**
+     * Notify the watchers with the update <i>value</i>.
+     *
+     * @param value value to notify
+     */
+    public <R> void notifyWatchers(Function<R, T> valueFn, R value) {
+        RecyclableArrayList<Watcher<T>> watchersLocal;
+        synchronized (this) {
+            watchersLocal = watchers;
+            watchers = recycler.newInstance();
+        }
+
+        for (Watcher<T> watcher : watchersLocal) {
+            watcher.update(valueFn.apply(value));
+        }
+        watchersLocal.recycle();
+    }
+
+    /**
+     * Clears the watcher list so that this object no longer has any watchers.
+     */
+    public synchronized void deleteWatchers() {
+        watchers.clear();
+    }
+
+    @Override
+    public synchronized void recycle() {
+        watchers.recycle();
+    }
+}
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
similarity index 59%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
index a0c112d..220a942 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/Watcher.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,21 +15,24 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-package org.apache.bookkeeper.bookie;
+
+package org.apache.bookkeeper.common.util;
 
 /**
- * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
+ * A class can implement the <code>Watcher</code> interface when it
+ * wants to be informed of <i>one-time</i> changes in watchable objects.
  */
-public class LastAddConfirmedUpdateNotification {
-    public long lastAddConfirmed;
-    public long timestamp;
+public interface Watcher<T> {
+
+    /**
+     * This method is called whenever the watched object is changed. An
+     * application calls an <tt>Watchable</tt> object's
+     * <code>notifyWatchers</code> method to have all the object's
+     * watchers notified of the change.
+     *
+     * @param value the updated value of a watchable
+     */
+    void update(T value);
 
-    public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
-        this.lastAddConfirmed = lastAddConfirmed;
-        this.timestamp = System.currentTimeMillis();
-    }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
similarity index 53%
copy from bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
copy to bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
index a0c112d..9037d21 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/collections/RecyclableArrayListTest.java
@@ -1,5 +1,4 @@
-/**
- *
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,21 +15,34 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
-package org.apache.bookkeeper.bookie;
+package org.apache.bookkeeper.common.collections;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+import org.junit.Test;
 
 /**
- * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
- *
- * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
+ * Unit test of {@link RecyclableArrayList}.
  */
-public class LastAddConfirmedUpdateNotification {
-    public long lastAddConfirmed;
-    public long timestamp;
+public class RecyclableArrayListTest {
+
+    private final Recycler<Integer> recycler;
 
-    public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
-        this.lastAddConfirmed = lastAddConfirmed;
-        this.timestamp = System.currentTimeMillis();
+    public RecyclableArrayListTest() {
+        this.recycler = new Recycler<>();
     }
+
+    @Test
+    public void testRecycle() {
+        RecyclableArrayList<Integer> array = recycler.newInstance();
+        for (int i = 0; i < 5; i++) {
+            array.add(i);
+        }
+        assertEquals(5, array.size());
+        array.recycle();
+        assertEquals(0, array.size());
+    }
+
 }
diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
new file mode 100644
index 0000000..697d0da
--- /dev/null
+++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestWatchable.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.common.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.function.Function;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList.Recycler;
+import org.junit.After;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link Watchable}.
+ */
+public class TestWatchable {
+
+    private final Recycler<Watcher<Integer>> recycler;
+    private final Watchable<Integer> watchable;
+
+    public TestWatchable() {
+        this.recycler = new Recycler<>();
+        this.watchable = new Watchable<>(recycler);
+    }
+
+    @After
+    public void teardown() {
+        this.watchable.recycle();
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAddWatcher() {
+        Watcher<Integer> watcher = mock(Watcher.class);
+        assertTrue(watchable.addWatcher(watcher));
+        assertEquals(1, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher, times(1)).update(eq(123));
+
+        // after the watcher is fired, watcher should be removed from watcher list.
+        assertEquals(0, watchable.getNumWatchers());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testDeleteWatcher() {
+        Watcher<Integer> watcher = mock(Watcher.class);
+        assertTrue(watchable.addWatcher(watcher));
+        assertEquals(1, watchable.getNumWatchers());
+        assertTrue(watchable.deleteWatcher(watcher));
+        assertEquals(0, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher, times(0)).update(anyInt());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testMultipleWatchers() {
+        Watcher<Integer> watcher1 = mock(Watcher.class);
+        Watcher<Integer> watcher2 = mock(Watcher.class);
+
+        assertTrue(watchable.addWatcher(watcher1));
+        assertTrue(watchable.addWatcher(watcher2));
+        assertEquals(2, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher1, times(1)).update(eq(123));
+        verify(watcher2, times(1)).update(eq(123));
+        assertEquals(0, watchable.getNumWatchers());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testAddWatchMultipleTimes() {
+        Watcher<Integer> watcher = mock(Watcher.class);
+
+        int numTimes = 3;
+        for (int i = 0; i < numTimes; i++) {
+            assertTrue(watchable.addWatcher(watcher));
+        }
+        assertEquals(numTimes, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher, times(numTimes)).update(eq(123));
+        assertEquals(0, watchable.getNumWatchers());
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testDeleteWatchers() {
+        Watcher<Integer> watcher1 = mock(Watcher.class);
+        Watcher<Integer> watcher2 = mock(Watcher.class);
+
+        assertTrue(watchable.addWatcher(watcher1));
+        assertTrue(watchable.addWatcher(watcher2));
+        assertEquals(2, watchable.getNumWatchers());
+        watchable.deleteWatchers();
+        assertEquals(0, watchable.getNumWatchers());
+
+        watchable.notifyWatchers(Function.identity(), 123);
+        verify(watcher1, times(0)).update(anyInt());
+        verify(watcher2, times(0)).update(anyInt());
+    }
+
+}
diff --git a/bookkeeper-common/src/test/resources/log4j.properties b/bookkeeper-common/src/test/resources/log4j.properties
new file mode 100644
index 0000000..10ae6bf
--- /dev/null
+++ b/bookkeeper-common/src/test/resources/log4j.properties
@@ -0,0 +1,42 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+#
+
+#
+# Bookkeeper Logging Configuration
+#
+
+# Format is "<default threshold> (, <appender>)+
+
+# DEFAULT: console appender only, level INFO
+bookkeeper.root.logger=INFO,CONSOLE
+log4j.rootLogger=${bookkeeper.root.logger}
+
+#
+# Log INFO level and above messages to the console
+#
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+log4j.logger.org.apache.bookkeeper.bookie=INFO
+log4j.logger.org.apache.bookkeeper.meta=INFO
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index c08ff46..074c29c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -53,8 +53,6 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -73,6 +71,7 @@ import org.apache.bookkeeper.bookie.BookieException.UnknownBookieIdException;
 import org.apache.bookkeeper.bookie.Journal.JournalScanner;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.discover.RegistrationManager;
 import org.apache.bookkeeper.discover.ZKRegistrationManager;
@@ -1433,10 +1432,12 @@ public class Bookie extends BookieCriticalThread {
         return handle.getLastAddConfirmed();
     }
 
-    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer)
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                 long previousLAC,
+                                                 Watcher<LastAddConfirmedUpdateNotification> watcher)
             throws IOException {
         LedgerDescriptor handle = handles.getReadOnlyHandle(ledgerId);
-        return handle.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+        return handle.waitForLastAddConfirmedUpdate(previousLAC, watcher);
     }
 
     // The rest of the code is test stuff
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
index c14be5f..38edbda 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/FileInfo.java
@@ -22,6 +22,7 @@
 package org.apache.bookkeeper.bookie;
 
 import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification.WATCHER_RECYCLER;
 
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.buffer.ByteBuf;
@@ -32,9 +33,9 @@ import java.io.RandomAccessFile;
 import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.bookkeeper.common.util.Watchable;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,7 +59,7 @@ import org.slf4j.LoggerFactory;
  * in entry loggers.
  * </p>
  */
-class FileInfo extends Observable {
+class FileInfo extends Watchable<LastAddConfirmedUpdateNotification> {
     private static final Logger LOG = LoggerFactory.getLogger(FileInfo.class);
 
     static final int NO_MASTER_KEY = -1;
@@ -93,8 +94,9 @@ class FileInfo extends Observable {
     protected String mode;
 
     public FileInfo(File lf, byte[] masterKey) throws IOException {
-        this.lf = lf;
+        super(WATCHER_RECYCLER);
 
+        this.lf = lf;
         this.masterKey = masterKey;
         mode = "rw";
     }
@@ -105,10 +107,11 @@ class FileInfo extends Observable {
 
     long setLastAddConfirmed(long lac) {
         long lacToReturn;
+        boolean changed = false;
         synchronized (this) {
             if (null == this.lac || this.lac < lac) {
                 this.lac = lac;
-                setChanged();
+                changed = true;
             }
             lacToReturn = this.lac;
         }
@@ -116,22 +119,24 @@ class FileInfo extends Observable {
             LOG.trace("Updating LAC {} , {}", lacToReturn, lac);
         }
 
-
-        notifyObservers(new LastAddConfirmedUpdateNotification(lacToReturn));
+        if (changed) {
+            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, lacToReturn);
+        }
         return lacToReturn;
     }
 
-    synchronized Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer observe) {
+    synchronized boolean waitForLastAddConfirmedUpdate(long previousLAC,
+                                                       Watcher<LastAddConfirmedUpdateNotification> watcher) {
         if ((null != lac && lac > previousLAC)
                 || isClosed || ((stateBits & STATE_FENCED_BIT) == STATE_FENCED_BIT)) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Wait For LAC {} , {}", this.lac, previousLAC);
             }
-            return null;
+            return false;
         }
 
-        addObserver(observe);
-        return this;
+        addWatcher(watcher);
+        return true;
     }
 
     public synchronized File getLf() {
@@ -283,6 +288,7 @@ class FileInfo extends Observable {
      */
     public boolean setFenced() throws IOException {
         boolean returnVal = false;
+        boolean changed = false;
         synchronized (this) {
             checkOpen(false);
             if (LOG.isDebugEnabled()) {
@@ -293,12 +299,14 @@ class FileInfo extends Observable {
                 stateBits |= STATE_FENCED_BIT;
                 needFlushHeader = true;
                 synchronized (this) {
-                    setChanged();
+                    changed = true;
                 }
                 returnVal = true;
             }
         }
-        notifyObservers(new LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
+        if (changed) {
+            notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, Long.MAX_VALUE);
+        }
         return returnVal;
     }
 
@@ -379,20 +387,36 @@ class FileInfo extends Observable {
      *          if set to false, the index is not forced to create.
      */
     public void close(boolean force) throws IOException {
-        synchronized (this) {
-            isClosed = true;
-            checkOpen(force, true);
-            // Any time when we force close a file, we should try to flush header. otherwise, we might lose fence bit.
-            if (force) {
-                flushHeader();
+        boolean closing = false;
+        try {
+            boolean changed = false;
+            synchronized (this) {
+                if (isClosed) {
+                    return;
+                }
+                isClosed = true;
+                closing = true;
+                checkOpen(force, true);
+                // Any time when we force close a file, we should try to flush header.
+                // otherwise, we might lose fence bit.
+                if (force) {
+                    flushHeader();
+                }
+                changed = true;
+                if (useCount.get() == 0 && fc != null) {
+                    fc.close();
+                    fc = null;
+                }
             }
-            setChanged();
-            if (useCount.get() == 0 && fc != null) {
-                fc.close();
-                fc = null;
+            if (changed) {
+                notifyWatchers(LastAddConfirmedUpdateNotification.FUNC, Long.MAX_VALUE);
+            }
+        } finally {
+            if (closing) {
+                // recycle this watchable after the FileInfo is closed.
+                recycle();
             }
         }
-        notifyObservers(new LastAddConfirmedUpdateNotification(Long.MAX_VALUE));
     }
 
     public synchronized long write(ByteBuffer[] buffs, long position) throws IOException {
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
index 3120fa4..8598bc4 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexPersistenceMgr.java
@@ -31,7 +31,6 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.UncheckedExecutionException;
-
 import io.netty.buffer.ByteBuf;
 import java.io.File;
 import java.io.IOException;
@@ -40,17 +39,15 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
@@ -435,11 +432,13 @@ public class IndexPersistenceMgr {
         }
     }
 
-    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException {
+    boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                          long previousLAC,
+                                          Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
         FileInfo fi = null;
         try {
             fi = getFileInfo(ledgerId, null);
-            return fi.waitForLastAddConfirmedUpdate(previoisLAC, observer);
+            return fi.waitForLastAddConfirmedUpdate(previousLAC, watcher);
         } finally {
             if (null != fi) {
                 fi.release();
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
index 6db3e6d..b654b0e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/InterleavedLedgerStorage.java
@@ -32,14 +32,13 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.Bookie.NoLedgerException;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.EntryLogger.EntryLogListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -251,9 +250,11 @@ public class InterleavedLedgerStorage implements CompactableLedgerStorage, Entry
     }
 
     @Override
-    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer)
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                 long previousLAC,
+                                                 Watcher<LastAddConfirmedUpdateNotification> watcher)
             throws IOException {
-        return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer);
+        return ledgerCache.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
     }
 
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index 1a3635c..11468d1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -38,6 +38,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.stats.Counter;
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
index a0c112d..71cbd61 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotification.java
@@ -20,17 +20,54 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import java.util.function.Function;
+import lombok.Getter;
+import org.apache.bookkeeper.common.collections.RecyclableArrayList;
+import org.apache.bookkeeper.common.util.Recyclable;
+import org.apache.bookkeeper.common.util.Watcher;
+
 /**
  * A signal object is used for notifying the observers when the {@code LastAddConfirmed} is advanced.
  *
  * <p>The signal object contains the latest {@code LastAddConfirmed} and when the {@code LastAddConfirmed} is advanced.
  */
-public class LastAddConfirmedUpdateNotification {
-    public long lastAddConfirmed;
-    public long timestamp;
+@Getter
+public class LastAddConfirmedUpdateNotification implements Recyclable {
+
+    public static final Function<Long, LastAddConfirmedUpdateNotification> FUNC = lac -> of(lac);
+
+    public static final RecyclableArrayList.Recycler<Watcher<LastAddConfirmedUpdateNotification>> WATCHER_RECYCLER =
+        new RecyclableArrayList.Recycler<>();
+
+    public static LastAddConfirmedUpdateNotification of(long lastAddConfirmed) {
+        LastAddConfirmedUpdateNotification lac = RECYCLER.get();
+        lac.lastAddConfirmed = lastAddConfirmed;
+        lac.timestamp = System.currentTimeMillis();
+        return lac;
+    }
+
+    private static final Recycler<LastAddConfirmedUpdateNotification> RECYCLER =
+        new Recycler<LastAddConfirmedUpdateNotification>() {
+            @Override
+            protected LastAddConfirmedUpdateNotification newObject(Handle<LastAddConfirmedUpdateNotification> handle) {
+                return new LastAddConfirmedUpdateNotification(handle);
+            }
+        };
+
+    private final Handle<LastAddConfirmedUpdateNotification> handle;
+    private long lastAddConfirmed;
+    private long timestamp;
+
+    public LastAddConfirmedUpdateNotification(Handle<LastAddConfirmedUpdateNotification> handle) {
+        this.handle = handle;
+    }
 
-    public LastAddConfirmedUpdateNotification(long lastAddConfirmed) {
-        this.lastAddConfirmed = lastAddConfirmed;
-        this.timestamp = System.currentTimeMillis();
+    @Override
+    public void recycle() {
+        this.lastAddConfirmed = -1L;
+        this.timestamp = -1L;
+        handle.recycle(this);
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
index 26d5245..14d4825 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCache.java
@@ -24,8 +24,7 @@ package org.apache.bookkeeper.bookie;
 import io.netty.buffer.ByteBuf;
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
 
 /**
  * This class maps a ledger entry number into a location (entrylogid, offset) in
@@ -49,7 +48,9 @@ interface LedgerCache extends Closeable {
 
     Long getLastAddConfirmed(long ledgerId) throws IOException;
     long updateLastAddConfirmed(long ledgerId, long lac) throws IOException;
-    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException;
+    boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                          long previousLAC,
+                                          Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
 
     void deleteLedger(long ledgerId) throws IOException;
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
index 0b63e32..1db7d47 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerCacheImpl.java
@@ -23,8 +23,7 @@ package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -85,12 +84,13 @@ public class LedgerCacheImpl implements LedgerCache {
     }
 
     @Override
-    public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer)
+    public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                 long previousLAC,
+                                                 Watcher<LastAddConfirmedUpdateNotification> watcher)
             throws IOException {
-        return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, previoisLAC, observer);
+        return indexPersistenceManager.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
     }
 
-
     @Override
     public void putEntryOffset(long ledger, long entry, long offset) throws IOException {
         indexPageManager.putEntryOffset(ledger, entry, offset);
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
index 970cec0..f4db7a6 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptor.java
@@ -27,8 +27,7 @@ import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import org.apache.bookkeeper.common.util.Watcher;
 
 /**
  * Implements a ledger inside a bookie. In particular, it implements operations
@@ -78,7 +77,9 @@ public abstract class LedgerDescriptor {
     abstract ByteBuf readEntry(long entryId) throws IOException;
 
     abstract long getLastAddConfirmed() throws IOException;
-    abstract Observable waitForLastAddConfirmedUpdate(long previoisLAC, Observer observer) throws IOException;
+    abstract boolean waitForLastAddConfirmedUpdate(long previousLAC,
+                                                   Watcher<LastAddConfirmedUpdateNotification> watcher)
+        throws IOException;
 
     abstract void setExplicitLac(ByteBuf entry) throws IOException;
 
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
index af55f6a..d126c4c 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerDescriptorImpl.java
@@ -25,9 +25,8 @@ import com.google.common.util.concurrent.SettableFuture;
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -156,7 +155,8 @@ public class LedgerDescriptorImpl extends LedgerDescriptor {
     }
 
     @Override
-    Observable waitForLastAddConfirmedUpdate(long previousLAC, Observer observer) throws IOException {
-        return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, observer);
+    boolean waitForLastAddConfirmedUpdate(long previousLAC,
+                                          Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException {
+        return ledgerStorage.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, watcher);
     }
 }
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
index d2055a8..3e2f7dc 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/LedgerStorage.java
@@ -23,9 +23,8 @@ package org.apache.bookkeeper.bookie;
 
 import io.netty.buffer.ByteBuf;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -119,12 +118,14 @@ public interface LedgerStorage {
     /**
      * Wait for last add confirmed update.
      *
-     * @param previoisLAC - The threshold beyond which we would wait for the update
-     * @param observer  - Observer to notify on update
+     * @param previousLAC - The threshold beyond which we would wait for the update
+     * @param watcher  - Watcher to notify on update
      * @return
      * @throws IOException
      */
-    Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException;
+    boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                          long previousLAC,
+                                          Watcher<LastAddConfirmedUpdateNotification> watcher) throws IOException;
 
     /**
      * Flushes all data in the storage. Once this is called,
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
index 342e788..f60dac7 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/LongPollReadEntryProcessorV3.java
@@ -22,16 +22,14 @@ import com.google.common.base.Stopwatch;
 import io.netty.channel.Channel;
 import io.netty.util.HashedWheelTimer;
 import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.Request;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ReadResponse;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
@@ -41,7 +39,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Processor handling long poll read entry request.
  */
-class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Observer {
+class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Watcher<LastAddConfirmedUpdateNotification> {
 
     private final static Logger logger = LoggerFactory.getLogger(LongPollReadEntryProcessorV3.class);
 
@@ -141,9 +139,9 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser
 
             final Stopwatch startTimeSw = Stopwatch.createStarted();
 
-            final Observable observable;
+            final boolean watched;
             try {
-                observable = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
+                watched = requestProcessor.bookie.waitForLastAddConfirmedUpdate(ledgerId, previousLAC, this);
             } catch (Bookie.NoLedgerException e) {
                 logger.info("No ledger found while longpoll reading ledger {}, previous lac = {}.",
                         ledgerId, previousLAC);
@@ -157,19 +155,16 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser
             registerSuccessfulEvent(requestProcessor.longPollPreWaitStats, startTimeSw);
             lastPhaseStartTime.reset().start();
 
-            if (null != observable) {
-                // successfully registered observable to lac updates
+            if (watched) {
+                // successfully registered watcher to lac updates
                 if (logger.isTraceEnabled()) {
                     logger.trace("Waiting For LAC Update {}: Timeout {}", previousLAC, readRequest.getTimeOut());
                 }
                 synchronized (this) {
-                    expirationTimerTask = requestTimer.newTimeout(new TimerTask() {
-                        @Override
-                        public void run(Timeout timeout) throws Exception {
-                            // When the timeout expires just get whatever is the current
-                            // readLastConfirmed
-                            LongPollReadEntryProcessorV3.this.scheduleDeferredRead(observable, true);
-                        }
+                    expirationTimerTask = requestTimer.newTimeout(timeout -> {
+                        // When the timeout expires just get whatever is the current
+                        // readLastConfirmed
+                        LongPollReadEntryProcessorV3.this.scheduleDeferredRead(true);
                     }, readRequest.getTimeOut(), TimeUnit.MILLISECONDS);
                 }
                 return null;
@@ -188,27 +183,25 @@ class LongPollReadEntryProcessorV3 extends ReadEntryProcessorV3 implements Obser
     }
 
     @Override
-    public void update(Observable observable, Object o) {
-        LastAddConfirmedUpdateNotification newLACNotification = (LastAddConfirmedUpdateNotification)o;
-        if (newLACNotification.lastAddConfirmed > previousLAC) {
-            if (newLACNotification.lastAddConfirmed != Long.MAX_VALUE &&
-                    !lastAddConfirmedUpdateTime.isPresent()) {
-                lastAddConfirmedUpdateTime = Optional.of(newLACNotification.timestamp);
+    public void update(LastAddConfirmedUpdateNotification newLACNotification) {
+        if (newLACNotification.getLastAddConfirmed() > previousLAC) {
+            if (newLACNotification.getLastAddConfirmed() != Long.MAX_VALUE && !lastAddConfirmedUpdateTime.isPresent()) {
+                lastAddConfirmedUpdateTime = Optional.of(newLACNotification.getTimestamp());
             }
             if (logger.isTraceEnabled()) {
                 logger.trace("Last Add Confirmed Advanced to {} for request {}",
-                        newLACNotification.lastAddConfirmed, request);
+                        newLACNotification.getLastAddConfirmed(), request);
             }
-            scheduleDeferredRead(observable, false);
+            scheduleDeferredRead(false);
         }
+        newLACNotification.recycle();
     }
 
-    private synchronized void scheduleDeferredRead(Observable observable, boolean timeout) {
+    private synchronized void scheduleDeferredRead(boolean timeout) {
         if (null == deferredTask) {
             if (logger.isTraceEnabled()) {
                 logger.trace("Deferred Task, expired: {}, request: {}", timeout, request);
             }
-            observable.deleteObserver(this);
             try {
                 shouldReadEntry = true;
                 deferredTask = longPollThreadPool.submit(this);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
index 1c63d39..a179745 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/IndexPersistenceMgrTest.java
@@ -20,6 +20,15 @@
  */
 package org.apache.bookkeeper.bookie;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.DiskChecker;
@@ -31,13 +40,6 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.util.Observable;
-import java.util.Observer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
 /**
  * Test cases for IndexPersistenceMgr
  */
@@ -235,29 +237,27 @@ public class IndexPersistenceMgrTest {
     @Test
     public void testEvictionShouldNotAffectLongPollRead() throws Exception {
         IndexPersistenceMgr indexPersistenceMgr = null;
-        Observer observer = (obs, obj) -> {
-            //no-ops
-        };
+        Watcher<LastAddConfirmedUpdateNotification> watcher = notification -> notification.recycle();
         try {
             indexPersistenceMgr = createIndexPersistenceManager(1);
             indexPersistenceMgr.getFileInfo(lid, masterKey);
             indexPersistenceMgr.getFileInfo(lid, null);
             indexPersistenceMgr.updateLastAddConfirmed(lid, 1);
-            Observable observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
-            // observer shouldn't be null because ledger is not evicted or closed
-            assertNotNull("Observer should not be null", observable);
+            // watch should succeed because ledger is not evicted or closed
+            assertTrue(
+                indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher));
             // now evict ledger 1 from write cache
             indexPersistenceMgr.getFileInfo(lid + 1, masterKey);
-            observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
-            // even if ledger 1 is evicted from write cache, observer still shouldn't be null
-            assertNotNull("Observer should not be null", observable);
+            // even if ledger 1 is evicted from write cache, watcher should still succeed
+            assertTrue(
+                indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher));
             // now evict ledger 1 from read cache
             indexPersistenceMgr.getFileInfo(lid + 2, masterKey);
             indexPersistenceMgr.getFileInfo(lid + 2, null);
-            // even if ledger 1 is evicted from both cache, observer still shouldn't be null because it
+            // even if ledger 1 is evicted from both cache, watcher should still succeed because it
             // will create a new FileInfo when cache miss
-            observable = indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, observer);
-            assertNotNull("Observer should not be null", observable);
+            assertTrue(
+                indexPersistenceMgr.waitForLastAddConfirmedUpdate(lid, 1, watcher));
         } finally {
             if (null != indexPersistenceMgr) {
                 indexPersistenceMgr.close();
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
new file mode 100644
index 0000000..90da463
--- /dev/null
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LastAddConfirmedUpdateNotificationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.bookkeeper.bookie;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.junit.Test;
+
+/**
+ * Unit test of {@link LastAddConfirmedUpdateNotification}.
+ */
+public class LastAddConfirmedUpdateNotificationTest {
+
+    @Test
+    public void testGetters() {
+        long lac = System.currentTimeMillis();
+        LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.of(lac);
+
+        long timestamp = System.currentTimeMillis();
+        assertEquals(lac, notification.getLastAddConfirmed());
+        assertTrue(notification.getTimestamp() <= timestamp);
+
+        notification.recycle();
+    }
+
+    @Test
+    public void testRecycle() {
+        long lac = System.currentTimeMillis();
+        LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.of(lac);
+        notification.recycle();
+
+        assertEquals(-1L, notification.getLastAddConfirmed());
+        assertEquals(-1L, notification.getTimestamp());
+    }
+
+    @Test
+    public void testFunc() {
+        long lac = System.currentTimeMillis();
+        LastAddConfirmedUpdateNotification notification = LastAddConfirmedUpdateNotification.FUNC.apply(lac);
+
+        assertEquals(lac, notification.getLastAddConfirmed());
+    }
+
+}
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
index 5f30557..d02f067 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/TestSyncThread.java
@@ -23,8 +23,7 @@ package org.apache.bookkeeper.bookie;
 import io.netty.buffer.ByteBuf;
 import java.io.File;
 import java.io.IOException;
-import java.util.Observable;
-import java.util.Observer;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Callable;
@@ -33,13 +32,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.bookkeeper.conf.TestBKConfiguration;
-import org.apache.bookkeeper.conf.ServerConfiguration;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.LedgerDirsListener;
 import org.apache.bookkeeper.bookie.LedgerDirsManager.NoWritableLedgerDirException;
+import org.apache.bookkeeper.common.util.Watcher;
+import org.apache.bookkeeper.conf.ServerConfiguration;
+import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager;
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.junit.Test;
@@ -341,8 +341,11 @@ public class TestSyncThread {
         }
 
         @Override
-        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException {
-            return null;
+        public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                     long previousLAC,
+                                                     Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+            return false;
         }
 
         @Override
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
index ad21fc1..8cec701 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/GcLedgersTest.java
@@ -21,7 +21,6 @@
 
 package org.apache.bookkeeper.meta;
 
-
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -37,8 +36,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
 import java.util.Queue;
 import java.util.Random;
 import java.util.Set;
@@ -55,11 +52,13 @@ import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.GarbageCollector;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
 import org.apache.bookkeeper.bookie.ScanAndCompareGarbageCollector;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerMetadata;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRange;
 import org.apache.bookkeeper.meta.LedgerManager.LedgerRangeIterator;
@@ -425,8 +424,11 @@ public class GcLedgersTest extends LedgerManagerTestCase {
         }
 
         @Override
-        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException {
-            return null;
+        public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                     long previousLAC,
+                                                     Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+            return false;
         }
     }
 }
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
index 5d357c3..492320c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/meta/LedgerManagerTestCase.java
@@ -27,8 +27,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Map;
 import java.util.NavigableMap;
-import java.util.Observable;
-import java.util.Observer;
 import org.apache.bookkeeper.bookie.BookieException;
 import org.apache.bookkeeper.bookie.CheckpointSource;
 import org.apache.bookkeeper.bookie.CheckpointSource.Checkpoint;
@@ -36,7 +34,9 @@ import org.apache.bookkeeper.bookie.Checkpointer;
 import org.apache.bookkeeper.bookie.CompactableLedgerStorage;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
+import org.apache.bookkeeper.bookie.LastAddConfirmedUpdateNotification;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.common.util.Watcher;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
@@ -210,8 +210,11 @@ public abstract class LedgerManagerTestCase extends BookKeeperClusterTestCase {
         }
 
         @Override
-        public Observable waitForLastAddConfirmedUpdate(long ledgerId, long previoisLAC, Observer observer) throws IOException {
-            return null;
+        public boolean waitForLastAddConfirmedUpdate(long ledgerId,
+                                                     long previousLAC,
+                                                     Watcher<LastAddConfirmedUpdateNotification> watcher)
+                throws IOException {
+            return false;
         }
 
         @Override

-- 
To stop receiving notification emails like this one, please contact
"commits@bookkeeper.apache.org" <co...@bookkeeper.apache.org>.