You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:53 UTC

[21/23] incubator-distributedlog git commit: DL-124: Use Java8 Future rather than twitter Future

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java
new file mode 100644
index 0000000..ee17950
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * defines the utilities used across the project.
+ */
+package org.apache.distributedlog.util;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/resources/findbugsExclude.xml b/distributedlog-common/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000..ce2c176
--- /dev/null
+++ b/distributedlog-common/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,32 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+//-->
+<FindBugsFilter>
+  <Match>
+    <Class name="org.apache.distributedlog.common.concurrent.FutureUtils"/>
+    <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.common.concurrent.FutureUtils$2"/>
+    <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+  </Match>
+  <Match>
+    <Class name="org.apache.distributedlog.common.concurrent.FutureUtils"/>
+    <Method name="Void" />
+    <Bug pattern="NM_METHOD_NAMING_CONVENTION" />
+  </Match>
+</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
new file mode 100644
index 0000000..ddfb7ae
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.common.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.LongStream;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.junit.Test;
+
+/**
+ * Unit Test for {@link FutureUtils}.
+ */
+public class TestFutureUtils {
+
+    /**
+     * Test Exception.
+     */
+    static class TestException extends IOException {
+        private static final long serialVersionUID = -6256482498453846308L;
+
+        public TestException() {
+            super("test-exception");
+        }
+    }
+
+    @Test
+    public void testComplete() throws Exception {
+        CompletableFuture<Long> future = FutureUtils.createFuture();
+        FutureUtils.complete(future, 1024L);
+        assertEquals(1024L, FutureUtils.result(future).longValue());
+    }
+
+    @Test(expected = TestException.class)
+    public void testCompleteExceptionally() throws Exception {
+        CompletableFuture<Long> future = FutureUtils.createFuture();
+        FutureUtils.completeExceptionally(future, new TestException());
+        FutureUtils.result(future);
+    }
+
+    @Test
+    public void testWhenCompleteAsync() throws Exception {
+        OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+            .name("test-when-complete-async")
+            .corePoolSize(1)
+            .build();
+        AtomicLong resultHolder = new AtomicLong(0L);
+        CountDownLatch latch = new CountDownLatch(1);
+        CompletableFuture<Long> future = FutureUtils.createFuture();
+        FutureUtils.whenCompleteAsync(
+            future,
+            (result, cause) -> {
+                resultHolder.set(result);
+                latch.countDown();
+            },
+            scheduler,
+            new Object());
+        FutureUtils.complete(future, 1234L);
+        latch.await();
+        assertEquals(1234L, resultHolder.get());
+    }
+
+    @Test
+    public void testProxyToSuccess() throws Exception {
+        CompletableFuture<Long> src = FutureUtils.createFuture();
+        CompletableFuture<Long> target = FutureUtils.createFuture();
+        FutureUtils.proxyTo(src, target);
+        FutureUtils.complete(src, 10L);
+        assertEquals(10L, FutureUtils.result(target).longValue());
+    }
+
+    @Test(expected = TestException.class)
+    public void testProxyToFailure() throws Exception {
+        CompletableFuture<Long> src = FutureUtils.createFuture();
+        CompletableFuture<Long> target = FutureUtils.createFuture();
+        FutureUtils.proxyTo(src, target);
+        FutureUtils.completeExceptionally(src, new TestException());
+        FutureUtils.result(target);
+    }
+
+    @Test
+    public void testVoid() throws Exception {
+        CompletableFuture<Void> voidFuture = FutureUtils.Void();
+        assertTrue(voidFuture.isDone());
+        assertFalse(voidFuture.isCompletedExceptionally());
+        assertFalse(voidFuture.isCancelled());
+    }
+
+    @Test
+    public void testCollectEmptyList() throws Exception {
+        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+        List<Integer> result = FutureUtils.result(FutureUtils.collect(futures));
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    public void testCollectTenItems() throws Exception {
+        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+        List<Integer> expectedResults = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            futures.add(FutureUtils.value(i));
+            expectedResults.add(i);
+        }
+        List<Integer> results = FutureUtils.result(FutureUtils.collect(futures));
+        assertEquals(expectedResults, results);
+    }
+
+    @Test(expected = TestException.class)
+    public void testCollectFailures() throws Exception {
+        List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+        List<Integer> expectedResults = Lists.newArrayList();
+        for (int i = 0; i < 10; i++) {
+            if (i == 9) {
+                futures.add(FutureUtils.value(i));
+            } else {
+                futures.add(FutureUtils.exception(new TestException()));
+            }
+            expectedResults.add(i);
+        }
+        FutureUtils.result(FutureUtils.collect(futures));
+    }
+
+    @Test
+    public void testWithinAlreadyDone() throws Exception {
+        OrderedScheduler scheduler = mock(OrderedScheduler.class);
+        CompletableFuture<Long> doneFuture = FutureUtils.value(1234L);
+        CompletableFuture<Long> withinFuture = FutureUtils.within(
+            doneFuture,
+            10,
+            TimeUnit.MILLISECONDS,
+            new TestException(),
+            scheduler,
+            1234L);
+        TimeUnit.MILLISECONDS.sleep(20);
+        assertTrue(withinFuture.isDone());
+        assertFalse(withinFuture.isCancelled());
+        assertFalse(withinFuture.isCompletedExceptionally());
+        verify(scheduler, times(0))
+            .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testWithinZeroTimeout() throws Exception {
+        OrderedScheduler scheduler = mock(OrderedScheduler.class);
+        CompletableFuture<Long> newFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> withinFuture = FutureUtils.within(
+            newFuture,
+            0,
+            TimeUnit.MILLISECONDS,
+            new TestException(),
+            scheduler,
+            1234L);
+        TimeUnit.MILLISECONDS.sleep(20);
+        assertFalse(withinFuture.isDone());
+        assertFalse(withinFuture.isCancelled());
+        assertFalse(withinFuture.isCompletedExceptionally());
+        verify(scheduler, times(0))
+            .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+    }
+
+    @Test
+    public void testWithinCompleteBeforeTimeout() throws Exception {
+        OrderedScheduler scheduler = mock(OrderedScheduler.class);
+        ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+        when(scheduler.schedule(anyObject(), any(Runnable.class), anyLong(), any(TimeUnit.class)))
+            .thenAnswer(invocationOnMock -> scheduledFuture);
+        CompletableFuture<Long> newFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> withinFuture = FutureUtils.within(
+            newFuture,
+            Long.MAX_VALUE,
+            TimeUnit.MILLISECONDS,
+            new TestException(),
+            scheduler,
+            1234L);
+        assertFalse(withinFuture.isDone());
+        assertFalse(withinFuture.isCancelled());
+        assertFalse(withinFuture.isCompletedExceptionally());
+
+        newFuture.complete(5678L);
+
+        assertTrue(withinFuture.isDone());
+        assertFalse(withinFuture.isCancelled());
+        assertFalse(withinFuture.isCompletedExceptionally());
+        assertEquals((Long) 5678L, FutureUtils.result(withinFuture));
+
+        verify(scheduledFuture, times(1))
+            .cancel(eq(true));
+    }
+
+    @Test
+    public void testIgnoreSuccess() {
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
+        underlyFuture.complete(1234L);
+        assertTrue(ignoredFuture.isDone());
+        assertFalse(ignoredFuture.isCompletedExceptionally());
+        assertFalse(ignoredFuture.isCancelled());
+    }
+
+    @Test
+    public void testIgnoreFailure() {
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
+        underlyFuture.completeExceptionally(new TestException());
+        assertTrue(ignoredFuture.isDone());
+        assertFalse(ignoredFuture.isCompletedExceptionally());
+        assertFalse(ignoredFuture.isCancelled());
+    }
+
+    @Test
+    public void testEnsureSuccess() throws Exception {
+        CountDownLatch ensureLatch = new CountDownLatch(1);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
+            ensureLatch.countDown();
+        });
+        underlyFuture.complete(1234L);
+        FutureUtils.result(ensuredFuture);
+        assertTrue(ensuredFuture.isDone());
+        assertFalse(ensuredFuture.isCompletedExceptionally());
+        assertFalse(ensuredFuture.isCancelled());
+        ensureLatch.await();
+    }
+
+    @Test
+    public void testEnsureFailure() throws Exception {
+        CountDownLatch ensureLatch = new CountDownLatch(1);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
+            ensureLatch.countDown();
+        });
+        underlyFuture.completeExceptionally(new TestException());
+        FutureUtils.result(FutureUtils.ignore(ensuredFuture));
+        assertTrue(ensuredFuture.isDone());
+        assertTrue(ensuredFuture.isCompletedExceptionally());
+        assertFalse(ensuredFuture.isCancelled());
+        ensureLatch.await();
+    }
+
+    @Test
+    public void testRescueSuccess() throws Exception {
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        Function<Throwable, CompletableFuture<Long>> rescueFuc = mock(Function.class);
+        CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, rescueFuc);
+        underlyFuture.complete(1234L);
+        FutureUtils.result(rescuedFuture);
+        assertTrue(rescuedFuture.isDone());
+        assertFalse(rescuedFuture.isCompletedExceptionally());
+        assertFalse(rescuedFuture.isCancelled());
+        verify(rescueFuc, times(0)).apply(any(Throwable.class));
+    }
+
+    @Test
+    public void testRescueFailure() throws Exception {
+        CompletableFuture<Long> futureCompletedAtRescue = FutureUtils.value(3456L);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, (cause) -> futureCompletedAtRescue);
+        underlyFuture.completeExceptionally(new TestException());
+        FutureUtils.result(rescuedFuture);
+        assertTrue(rescuedFuture.isDone());
+        assertFalse(rescuedFuture.isCompletedExceptionally());
+        assertFalse(rescuedFuture.isCancelled());
+        assertEquals((Long) 3456L, FutureUtils.result(rescuedFuture));
+    }
+
+    @Test
+    public void testStatsSuccess() throws Exception {
+        OpStatsLogger statsLogger = mock(OpStatsLogger.class);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> statsFuture = FutureUtils.stats(
+            underlyFuture,
+            statsLogger,
+            Stopwatch.createStarted());
+        underlyFuture.complete(1234L);
+        FutureUtils.result(statsFuture);
+        verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong());
+    }
+
+    @Test
+    public void testStatsFailure() throws Exception {
+        OpStatsLogger statsLogger = mock(OpStatsLogger.class);
+        CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+        CompletableFuture<Long> statsFuture = FutureUtils.stats(
+            underlyFuture,
+            statsLogger,
+            Stopwatch.createStarted());
+        underlyFuture.completeExceptionally(new TestException());
+        FutureUtils.result(FutureUtils.ignore(statsFuture));
+        verify(statsLogger, times(1)).registerFailedEvent(anyLong());
+    }
+
+    @Test
+    public void testProcessListSuccess() throws Exception {
+        List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
+        List<Long> expectedList = Lists.transform(
+            longList,
+            aLong -> 2 * aLong);
+        Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
+        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+            longList,
+            sumFunc,
+            null);
+        assertEquals(expectedList, FutureUtils.result(totalFuture));
+    }
+
+    @Test
+    public void testProcessEmptyList() throws Exception {
+        List<Long> longList = Lists.newArrayList();
+        List<Long> expectedList = Lists.transform(
+            longList,
+            aLong -> 2 * aLong);
+        Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
+        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+            longList,
+            sumFunc,
+            null);
+        assertEquals(expectedList, FutureUtils.result(totalFuture));
+    }
+
+    @Test
+    public void testProcessListFailures() throws Exception {
+        List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
+        AtomicLong total = new AtomicLong(0L);
+        Function<Long, CompletableFuture<Long>> sumFunc = value -> {
+            if (value < 5) {
+                total.addAndGet(value);
+                return FutureUtils.value(2 * value);
+            } else {
+                return FutureUtils.exception(new TestException());
+            }
+        };
+        CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+            longList,
+            sumFunc,
+            null);
+        try {
+            FutureUtils.result(totalFuture);
+            fail("Should fail with TestException");
+        } catch (TestException te) {
+            // as expected
+        }
+        assertEquals(10L, total.get());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java
new file mode 100644
index 0000000..6b3ca58
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writer to write properties to files.
+ */
+public class PropertiesWriter {
+    static final Logger LOG = LoggerFactory.getLogger(PropertiesWriter.class);
+
+    final FileOutputStream outputStream;
+    final File configFile;
+    final Properties properties;
+
+    public PropertiesWriter() throws Exception {
+        this(null);
+    }
+
+    public PropertiesWriter(File configFile) throws Exception {
+        if (null == configFile) {
+            this.configFile = File.createTempFile("temp", ".conf");
+        } else {
+            this.configFile = configFile;
+        }
+        this.configFile.deleteOnExit();
+        this.properties = new Properties();
+        this.outputStream = new FileOutputStream(this.configFile);
+    }
+
+    public void setProperty(String key, String value) {
+        properties.setProperty(key, value);
+    }
+
+    public void removeProperty(String key) {
+        properties.remove(key);
+    }
+
+    public void save() throws Exception {
+        FileOutputStream outputStream = new FileOutputStream(configFile);
+        properties.store(outputStream, null);
+        configFile.setLastModified(configFile.lastModified() + 1000);
+        LOG.debug("save modified={}", configFile.lastModified());
+    }
+
+    public File getFile() {
+        return configFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java
new file mode 100644
index 0000000..a54faa0
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit test of {@link ConcurrentBaseConfiguration}.
+ */
+public class TestConcurrentBaseConfiguration {
+
+    @Test(timeout = 20000)
+    public void testBasicOperations() throws Exception {
+        ConcurrentBaseConfiguration conf = new ConcurrentBaseConfiguration();
+        conf.setProperty("prop1", "1");
+        assertEquals(1, conf.getInt("prop1"));
+        conf.setProperty("prop1", "2");
+        assertEquals(2, conf.getInt("prop1"));
+        conf.clearProperty("prop1");
+        assertEquals(null, conf.getInteger("prop1", null));
+        conf.setProperty("prop1", "1");
+        conf.setProperty("prop2", "2");
+        assertEquals(1, conf.getInt("prop1"));
+        assertEquals(2, conf.getInt("prop2"));
+        conf.clearProperty("prop1");
+        assertEquals(null, conf.getInteger("prop1", null));
+        assertEquals(2, conf.getInt("prop2"));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java
new file mode 100644
index 0000000..a474f89
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.jmock.lib.concurrent.DeterministicScheduler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Notes:
+ * 1. lastModified granularity is platform dependent, generally 1 sec, so we can't wait 1ms for things to
+ * get picked up.
+ */
+public class TestConfigurationSubscription {
+    static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class);
+
+    /**
+     * Give FileChangedReloadingStrategy some time to start reloading.
+     *
+     * <p>Make sure now!=lastChecked
+     * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()}
+     */
+    private void ensureConfigReloaded() throws InterruptedException {
+        // sleep 1 ms so that System.currentTimeMillis() !=
+        // lastChecked (the time we construct FileChangedReloadingStrategy
+        Thread.sleep(1);
+    }
+
+    @Test(timeout = 60000)
+    public void testReloadConfiguration() throws Exception {
+        PropertiesWriter writer = new PropertiesWriter();
+        FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
+        ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration());
+        DeterministicScheduler executorService = new DeterministicScheduler();
+        List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
+        ConfigurationSubscription confSub =
+                new ConfigurationSubscription(conf, fileConfigBuilders, executorService, 100, TimeUnit.MILLISECONDS);
+        final AtomicReference<ConcurrentBaseConfiguration> confHolder = new AtomicReference<>();
+        confSub.registerListener(new org.apache.distributedlog.common.config.ConfigurationListener() {
+            @Override
+            public void onReload(ConcurrentBaseConfiguration conf) {
+                confHolder.set(conf);
+            }
+        });
+        assertEquals(null, conf.getProperty("prop1"));
+
+        // add
+        writer.setProperty("prop1", "1");
+        writer.save();
+        // ensure the file change reloading event can be triggered
+        ensureConfigReloaded();
+        // reload the config
+        confSub.reload();
+        assertNotNull(confHolder.get());
+        assertTrue(conf == confHolder.get());
+        assertEquals("1", conf.getProperty("prop1"));
+    }
+
+    @Test(timeout = 60000)
+    public void testAddReloadBasicsConfig() throws Exception {
+        PropertiesWriter writer = new PropertiesWriter();
+        DeterministicScheduler mockScheduler = new DeterministicScheduler();
+        FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
+        ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration());
+        List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
+        ConfigurationSubscription confSub =
+                new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
+        assertEquals(null, conf.getProperty("prop1"));
+
+        // add
+        writer.setProperty("prop1", "1");
+        writer.save();
+        // ensure the file change reloading event can be triggered
+        ensureConfigReloaded();
+        mockScheduler.tick(100, TimeUnit.MILLISECONDS);
+        assertEquals("1", conf.getProperty("prop1"));
+
+    }
+
+    @Test(timeout = 60000)
+    public void testInitialConfigLoad() throws Exception {
+        PropertiesWriter writer = new PropertiesWriter();
+        writer.setProperty("prop1", "1");
+        writer.setProperty("prop2", "abc");
+        writer.setProperty("prop3", "123.0");
+        writer.setProperty("prop4", "11132");
+        writer.setProperty("prop5", "true");
+        writer.save();
+
+        ScheduledExecutorService mockScheduler = new DeterministicScheduler();
+        FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
+        ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration());
+        List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
+        ConfigurationSubscription confSub =
+                new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
+        assertEquals(1, conf.getInt("prop1"));
+        assertEquals("abc", conf.getString("prop2"));
+        assertEquals(123.0, conf.getFloat("prop3"), 0);
+        assertEquals(11132, conf.getInt("prop4"));
+        assertEquals(true, conf.getBoolean("prop5"));
+    }
+
+    @Test(timeout = 60000)
+    public void testExceptionInConfigLoad() throws Exception {
+        PropertiesWriter writer = new PropertiesWriter();
+        writer.setProperty("prop1", "1");
+        writer.save();
+
+        DeterministicScheduler mockScheduler = new DeterministicScheduler();
+        FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
+        ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration());
+        List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
+        ConfigurationSubscription confSub =
+                new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
+
+        final AtomicInteger count = new AtomicInteger(1);
+        conf.addConfigurationListener(new ConfigurationListener() {
+            @Override
+            public void configurationChanged(ConfigurationEvent event) {
+                LOG.info("config changed {}", event);
+                // Throw after so we actually see the update anyway.
+                if (!event.isBeforeUpdate()) {
+                    count.getAndIncrement();
+                    throw new RuntimeException("config listener threw and exception");
+                }
+            }
+        });
+
+        int i = 0;
+        int initial = 0;
+        while (count.get() == initial) {
+            writer.setProperty("prop1", Integer.toString(i++));
+            writer.save();
+            mockScheduler.tick(100, TimeUnit.MILLISECONDS);
+        }
+
+        initial = count.get();
+        while (count.get() == initial) {
+            writer.setProperty("prop1", Integer.toString(i++));
+            writer.save();
+            mockScheduler.tick(100, TimeUnit.MILLISECONDS);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java
new file mode 100644
index 0000000..7a981d1
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.notification.Failure;
+
+/**
+ * Test Case for {@link TimedOutTestsListener}.
+ */
+public class TestTimedOutTestsListener {
+
+    private static class Deadlock {
+        private CyclicBarrier barrier = new CyclicBarrier(6);
+
+        public Deadlock() {
+            DeadlockThread[] dThreads = new DeadlockThread[6];
+
+            Monitor a = new Monitor("a");
+            Monitor b = new Monitor("b");
+            Monitor c = new Monitor("c");
+            dThreads[0] = new DeadlockThread("MThread-1", a, b);
+            dThreads[1] = new DeadlockThread("MThread-2", b, c);
+            dThreads[2] = new DeadlockThread("MThread-3", c, a);
+
+            Lock d = new ReentrantLock();
+            Lock e = new ReentrantLock();
+            Lock f = new ReentrantLock();
+
+            dThreads[3] = new DeadlockThread("SThread-4", d, e);
+            dThreads[4] = new DeadlockThread("SThread-5", e, f);
+            dThreads[5] = new DeadlockThread("SThread-6", f, d);
+
+            // make them daemon threads so that the test will exit
+            for (int i = 0; i < 6; i++) {
+                dThreads[i].setDaemon(true);
+                dThreads[i].start();
+            }
+        }
+
+        class DeadlockThread extends Thread {
+            private Lock lock1 = null;
+
+            private Lock lock2 = null;
+
+            private Monitor mon1 = null;
+
+            private Monitor mon2 = null;
+
+            private boolean useSync;
+
+            DeadlockThread(String name, Lock lock1, Lock lock2) {
+                super(name);
+                this.lock1 = lock1;
+                this.lock2 = lock2;
+                this.useSync = true;
+            }
+
+            DeadlockThread(String name, Monitor mon1, Monitor mon2) {
+                super(name);
+                this.mon1 = mon1;
+                this.mon2 = mon2;
+                this.useSync = false;
+            }
+
+            public void run() {
+                if (useSync) {
+                    syncLock();
+                } else {
+                    monitorLock();
+                }
+            }
+
+            private void syncLock() {
+                lock1.lock();
+                try {
+                    try {
+                        barrier.await();
+                    } catch (Exception e) {
+                    }
+                    goSyncDeadlock();
+                } finally {
+                    lock1.unlock();
+                }
+            }
+
+            private void goSyncDeadlock() {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                }
+                lock2.lock();
+                throw new RuntimeException("should not reach here.");
+            }
+
+            private void monitorLock() {
+                synchronized (mon1) {
+                    try {
+                        barrier.await();
+                    } catch (Exception e) {
+                    }
+                    goMonitorDeadlock();
+                }
+            }
+
+            private void goMonitorDeadlock() {
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                }
+                synchronized (mon2) {
+                    throw new RuntimeException(getName() + " should not reach here.");
+                }
+            }
+        }
+
+        class Monitor {
+            String name;
+
+            Monitor(String name) {
+                this.name = name;
+            }
+        }
+
+    }
+
+    @Test(timeout = 500)
+    public void testThreadDumpAndDeadlocks() throws Exception {
+        new Deadlock();
+        String s = null;
+        while (true) {
+            s = TimedOutTestsListener.buildDeadlockInfo();
+            if (s != null) {
+                break;
+            }
+            Thread.sleep(100);
+        }
+
+        Assert.assertEquals(3, countStringOccurrences(s, "BLOCKED"));
+
+        Failure failure = new Failure(null, new Exception(TimedOutTestsListener.TEST_TIMED_OUT_PREFIX));
+        StringWriter writer = new StringWriter();
+        new TimedOutTestsListener(new PrintWriter(writer)).testFailure(failure);
+        String out = writer.toString();
+
+        Assert.assertTrue(out.contains("THREAD DUMP"));
+        Assert.assertTrue(out.contains("DEADLOCKS DETECTED"));
+
+        System.out.println(out);
+    }
+
+    private int countStringOccurrences(String s, String substr) {
+        int n = 0;
+        int index = 0;
+        while ((index = s.indexOf(substr, index) + 1) != 0) {
+            n++;
+        }
+        return n;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java
new file mode 100644
index 0000000..c86cf8f
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.junit.runner.notification.Failure;
+import org.junit.runner.notification.RunListener;
+
+/**
+ * JUnit run listener which prints full thread dump into System.err in case a test is failed due to
+ * timeout.
+ */
+public class TimedOutTestsListener extends RunListener {
+
+    static final String TEST_TIMED_OUT_PREFIX = "test timed out after";
+
+    private static String indent = "    ";
+
+    private final PrintWriter output;
+
+    public TimedOutTestsListener() {
+        this.output = new PrintWriter(System.err);
+    }
+
+    public TimedOutTestsListener(PrintWriter output) {
+        this.output = output;
+    }
+
+    @Override
+    public void testFailure(Failure failure) throws Exception {
+        if (failure != null && failure.getMessage() != null && failure.getMessage().startsWith(TEST_TIMED_OUT_PREFIX)) {
+            output.println("====> TEST TIMED OUT. PRINTING THREAD DUMP. <====");
+            output.println();
+            output.print(buildThreadDiagnosticString());
+        }
+    }
+
+    public static String buildThreadDiagnosticString() {
+        StringWriter sw = new StringWriter();
+        PrintWriter output = new PrintWriter(sw);
+
+        DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
+        output.println(String.format("Timestamp: %s", dateFormat.format(new Date())));
+        output.println();
+        output.println(buildThreadDump());
+
+        String deadlocksInfo = buildDeadlockInfo();
+        if (deadlocksInfo != null) {
+            output.println("====> DEADLOCKS DETECTED <====");
+            output.println();
+            output.println(deadlocksInfo);
+        }
+
+        return sw.toString();
+    }
+
+    static String buildThreadDump() {
+        StringBuilder dump = new StringBuilder();
+        Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces();
+        for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) {
+            Thread thread = e.getKey();
+            dump.append(String.format("\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s", thread.getName(),
+                (thread.isDaemon() ? "daemon" : ""), thread.getPriority(), thread.getId(),
+                Thread.State.WAITING.equals(thread.getState()) ? "in Object.wait()"
+                        : StringUtils.lowerCase(thread.getState().name()),
+                Thread.State.WAITING.equals(thread.getState()) ? "WAITING (on object monitor)" : thread.getState()));
+            for (StackTraceElement stackTraceElement : e.getValue()) {
+                dump.append("\n        at ");
+                dump.append(stackTraceElement);
+            }
+            dump.append("\n");
+        }
+        return dump.toString();
+    }
+
+    static String buildDeadlockInfo() {
+        ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+        long[] threadIds = threadBean.findMonitorDeadlockedThreads();
+        if (threadIds != null && threadIds.length > 0) {
+            StringWriter stringWriter = new StringWriter();
+            PrintWriter out = new PrintWriter(stringWriter);
+
+            ThreadInfo[] infos = threadBean.getThreadInfo(threadIds, true, true);
+            for (ThreadInfo ti : infos) {
+                printThreadInfo(ti, out);
+                printLockInfo(ti.getLockedSynchronizers(), out);
+                out.println();
+            }
+
+            out.close();
+            return stringWriter.toString();
+        } else {
+            return null;
+        }
+    }
+
+    private static void printThreadInfo(ThreadInfo ti, PrintWriter out) {
+        // print thread information
+        printThread(ti, out);
+
+        // print stack trace with locks
+        StackTraceElement[] stacktrace = ti.getStackTrace();
+        MonitorInfo[] monitors = ti.getLockedMonitors();
+        for (int i = 0; i < stacktrace.length; i++) {
+            StackTraceElement ste = stacktrace[i];
+            out.println(indent + "at " + ste.toString());
+            for (MonitorInfo mi : monitors) {
+                if (mi.getLockedStackDepth() == i) {
+                    out.println(indent + "  - locked " + mi);
+                }
+            }
+        }
+        out.println();
+    }
+
+    private static void printThread(ThreadInfo ti, PrintWriter out) {
+        out.print("\"" + ti.getThreadName() + "\"" + " Id=" + ti.getThreadId() + " in " + ti.getThreadState());
+        if (ti.getLockName() != null) {
+            out.print(" on lock=" + ti.getLockName());
+        }
+        if (ti.isSuspended()) {
+            out.print(" (suspended)");
+        }
+        if (ti.isInNative()) {
+            out.print(" (running in native)");
+        }
+        out.println();
+        if (ti.getLockOwnerName() != null) {
+            out.println(indent + " owned by " + ti.getLockOwnerName() + " Id=" + ti.getLockOwnerId());
+        }
+    }
+
+    private static void printLockInfo(LockInfo[] locks, PrintWriter out) {
+        out.println(indent + "Locked synchronizers: count = " + locks.length);
+        for (LockInfo li : locks) {
+            out.println(indent + "  - " + li);
+        }
+        out.println();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/pom.xml b/distributedlog-core-twitter/pom.xml
new file mode 100644
index 0000000..41116b9
--- /dev/null
+++ b/distributedlog-core-twitter/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.distributedlog</groupId>
+    <artifactId>distributedlog</artifactId>
+    <version>0.5.0-incubating-SNAPSHOT</version>
+  </parent>
+  <artifactId>distributedlog-core-twitter</artifactId>
+  <name>Apache DistributedLog :: Core Library (Twitter Future Interface)</name>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-core</artifactId>
+      <version>${project.parent.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>util-core_2.11</artifactId>
+      <version>${finagle.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>${junit.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <version>${slf4j.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <version>${mockito.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.distributedlog</groupId>
+      <artifactId>distributedlog-common</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+  <build>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <version>${maven-compiler-plugin.version}</version>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>${maven-jar-plugin.version}</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin.version}</version>
+        <configuration>
+          <redirectTestOutputToFile>true</redirectTestOutputToFile>
+          <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
+          <forkMode>always</forkMode>
+          <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
+          <properties>
+            <property>
+              <name>listener</name>
+              <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value>
+            </property>
+          </properties>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.codehaus.mojo</groupId>
+        <artifactId>findbugs-maven-plugin</artifactId>
+        <configuration>
+          <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>${maven-checkstyle-plugin.version}</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>${puppycrawl.checkstyle.version}</version>
+          </dependency>
+          <dependency>
+            <groupId>org.apache.distributedlog</groupId>
+            <artifactId>distributedlog-build-tools</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>distributedlog/checkstyle.xml</configLocation>
+          <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeResources>false</includeResources>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java
new file mode 100644
index 0000000..4ec1dfa
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.twitter.util.Future;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A log reader to read records in asynchronous way.
+ */
+public interface AsyncLogReader {
+
+    /**
+     * Get stream name that the reader reads from.
+     *
+     * @return stream name.
+     */
+    String getStreamName();
+
+    /**
+     * Read the next record from the log stream.
+     *
+     * @return A promise that when satisfied will contain the Log Record with its DLSN.
+     */
+    Future<LogRecordWithDLSN> readNext();
+
+    /**
+     * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
+     * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
+     * call.
+     *
+     * @param numEntries
+     *          num entries
+     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+     */
+    Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
+
+    /**
+     * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
+     *
+     * <p>The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
+     * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
+     * wait until new entries are available.
+     *
+     * @param numEntries
+     *          max entries to return
+     * @param waitTime
+     *          maximum wait time if there are entries already for read
+     * @param timeUnit
+     *          wait time unit
+     * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+     */
+    Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
+
+    /**
+     * Closes this source and releases any system resources associated
+     * with it. If the source is already closed then invoking this
+     * method has no effect.
+     *
+     * @return future representing the close result.
+     */
+    Future<Void> asyncClose();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java
new file mode 100644
index 0000000..4f4a90e
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.apache.distributedlog.util.FutureUtils.newTFuture;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.util.Future;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Implementation wrapper of {@link org.apache.distributedlog.api.AsyncLogReader}.
+ */
+class AsyncLogReaderImpl implements AsyncLogReader {
+
+    static final Function1<org.apache.distributedlog.api.AsyncLogReader, AsyncLogReader> MAP_FUNC =
+        new AbstractFunction1<org.apache.distributedlog.api.AsyncLogReader, AsyncLogReader>() {
+            @Override
+            public AsyncLogReader apply(org.apache.distributedlog.api.AsyncLogReader reader) {
+                return new AsyncLogReaderImpl(reader);
+            }
+        };
+
+    private final org.apache.distributedlog.api.AsyncLogReader impl;
+
+    AsyncLogReaderImpl(org.apache.distributedlog.api.AsyncLogReader impl) {
+        this.impl = impl;
+    }
+
+    @VisibleForTesting
+    org.apache.distributedlog.api.AsyncLogReader getImpl() {
+        return impl;
+    }
+
+    @Override
+    public String getStreamName() {
+        return impl.getStreamName();
+    }
+
+    @Override
+    public Future<LogRecordWithDLSN> readNext() {
+        return newTFuture(impl.readNext());
+    }
+
+    @Override
+    public Future<List<LogRecordWithDLSN>> readBulk(int numEntries) {
+        return newTFuture(impl.readBulk(numEntries));
+    }
+
+    @Override
+    public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit) {
+        return newTFuture(impl.readBulk(numEntries, waitTime, timeUnit));
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return newTFuture(impl.asyncClose());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
new file mode 100644
index 0000000..915877c
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.twitter.util.Future;
+import java.util.List;
+
+/**
+ * A writer that appends log records asynchronously.
+ */
+public interface AsyncLogWriter {
+
+    /**
+     * Get the last committed transaction id.
+     *
+     * @return last committed transaction id.
+     */
+    long getLastTxId();
+
+    /**
+     * Write a log record to the stream.
+     *
+     * @param record single log record
+     * @return A Future which contains a DLSN if the record was successfully written
+     * or an exception if the write fails
+     */
+    Future<DLSN> write(LogRecord record);
+
+    /**
+     * Write log records to the stream in bulk. Each future in the list represents the result of
+     * one write operation. The size of the result list is equal to the size of the input list.
+     * Buffers are written in order, and the list of result futures has the same order.
+     *
+     * @param record set of log records
+     * @return A Future which contains a list of Future DLSNs if the record was successfully written
+     * or an exception if the operation fails.
+     */
+    Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record);
+
+    /**
+     * Truncate the log until <i>dlsn</i>.
+     *
+     * @param dlsn
+     *          dlsn to truncate until.
+     * @return A Future indicates whether the operation succeeds or not, or an exception
+     * if the truncation fails.
+     */
+    Future<Boolean> truncate(DLSN dlsn);
+
+    /**
+     * Get the name of the stream this writer writes data to.
+     */
+    String getStreamName();
+
+    /**
+     * Closes this source and releases any system resources associated
+     * with it. If the source is already closed then invoking this
+     * method has no effect.
+     *
+     * @return future representing the close result.
+     */
+    Future<Void> asyncClose();
+
+    /**
+     * Aborts the object and releases any resources associated with it.
+     * If the object is already aborted then invoking this method has no
+     * effect.
+     *
+     * @return future represents the abort result
+     */
+    Future<Void> asyncAbort();
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java
new file mode 100644
index 0000000..dc28bb1
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.apache.distributedlog.util.FutureUtils.newTFuture;
+import static org.apache.distributedlog.util.FutureUtils.newTFutureList;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.util.Future;
+import java.util.List;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * The implementation of {@link AsyncLogWriter} built over {@link org.apache.distributedlog.api.AsyncLogWriter}.
+ */
+class AsyncLogWriterImpl implements AsyncLogWriter {
+
+    static final Function1<org.apache.distributedlog.api.AsyncLogWriter, AsyncLogWriter> MAP_FUNC =
+        new AbstractFunction1<org.apache.distributedlog.api.AsyncLogWriter, AsyncLogWriter>() {
+            @Override
+            public AsyncLogWriter apply(org.apache.distributedlog.api.AsyncLogWriter writer) {
+                return new AsyncLogWriterImpl(writer);
+            }
+        };
+
+    private final org.apache.distributedlog.api.AsyncLogWriter impl;
+
+    AsyncLogWriterImpl(org.apache.distributedlog.api.AsyncLogWriter impl) {
+        this.impl = impl;
+    }
+
+    @VisibleForTesting
+    org.apache.distributedlog.api.AsyncLogWriter getImpl() {
+        return impl;
+    }
+
+    @Override
+    public long getLastTxId() {
+        return impl.getLastTxId();
+    }
+
+    @Override
+    public Future<DLSN> write(LogRecord record) {
+        return newTFuture(impl.write(record));
+    }
+
+    @Override
+    public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record) {
+        return newTFutureList(impl.writeBulk(record));
+    }
+
+    @Override
+    public Future<Boolean> truncate(DLSN dlsn) {
+        return newTFuture(impl.truncate(dlsn));
+    }
+
+    @Override
+    public String getStreamName() {
+        return impl.getStreamName();
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return newTFuture(impl.asyncClose());
+    }
+
+    @Override
+    public Future<Void> asyncAbort() {
+        return newTFuture(impl.asyncAbort());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java
new file mode 100644
index 0000000..14f05c3
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.twitter.util.Future;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+
+/**
+ * A DistributedLogManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+public interface DistributedLogManager extends Closeable {
+
+    /**
+     * Get the name of the stream managed by this log manager.
+     *
+     * @return streamName
+     */
+    String getStreamName();
+
+    /**
+     * Get the namespace driver used by this manager.
+     *
+     * @return the namespace driver
+     */
+    NamespaceDriver getNamespaceDriver();
+
+    /**
+     * Get log segments.
+     *
+     * @return log segments
+     * @throws IOException
+     */
+    List<LogSegmentMetadata> getLogSegments() throws IOException;
+
+    /**
+     * Register <i>listener</i> on log segment updates of this stream.
+     *
+     * @param listener
+     *          listener to receive update log segment list.
+     */
+    void registerListener(LogSegmentListener listener) throws IOException;
+
+    /**
+     * Unregister <i>listener</i> on log segment updates from this stream.
+     *
+     * @param listener
+     *          listener to receive update log segment list.
+     */
+    void unregisterListener(LogSegmentListener listener);
+
+    /**
+     * Open async log writer to write records to the log stream.
+     *
+     * @return result represents the open result
+     */
+    Future<AsyncLogWriter> openAsyncLogWriter();
+
+    /**
+     * Begin writing to the log stream identified by the name.
+     *
+     * @return the writer interface to generate log records
+     */
+    LogWriter startLogSegmentNonPartitioned() throws IOException;
+
+    /**
+     * Begin writing to the log stream identified by the name.
+     *
+     * @return the writer interface to generate log records
+     */
+    // @Deprecated
+    AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
+
+    /**
+     * Begin appending to the end of the log stream which is being treated as a sequence of bytes.
+     *
+     * @return the writer interface to generate log records
+     */
+    AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
+
+    /**
+     * Get a reader to read a log stream as a sequence of bytes.
+     *
+     * @return the writer interface to generate log records
+     */
+    AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
+
+    /**
+     * Get the input stream starting with fromTxnId for the specified log.
+     *
+     * @param fromTxnId - the first transaction id we want to read
+     * @return the stream starting with transaction fromTxnId
+     * @throws IOException if a stream cannot be found.
+     */
+    LogReader getInputStream(long fromTxnId)
+        throws IOException;
+
+    LogReader getInputStream(DLSN fromDLSN) throws IOException;
+
+    /**
+     * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
+     *
+     * @param fromTxnId
+     *          transaction id to start reading from
+     * @return async log reader
+     */
+    Future<AsyncLogReader> openAsyncLogReader(long fromTxnId);
+
+    /**
+     * Open an async log reader to read records from a log starting from <code>fromDLSN</code>.
+     *
+     * @param fromDLSN
+     *          dlsn to start reading from
+     * @return async log reader
+     */
+    Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
+
+    // @Deprecated
+    AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
+
+    // @Deprecated
+    AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
+
+    Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
+
+    /**
+     * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
+     * If two readers tried to open using same subscriberId, one would succeed, while the other
+     * will be blocked until it gets the lock.
+     *
+     * @param fromDLSN
+     *          start dlsn
+     * @param subscriberId
+     *          subscriber id
+     * @return async log reader
+     */
+    Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
+
+    /**
+     * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
+     * its last commit position recorded in subscription store. If no last commit position found
+     * in subscription store, it would start reading from head of the stream.
+     *
+     * <p>If the two readers tried to open using same subscriberId, one would succeed, while the other
+     * will be blocked until it gets the lock.
+     *
+     * @param subscriberId
+     *          subscriber id
+     * @return async log reader
+     */
+    Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
+
+    /**
+     * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
+     *
+     * @param transactionId
+     *          transaction id
+     * @return dlsn of first log record whose transaction id is not less than transactionId.
+     */
+    Future<DLSN> getDLSNNotLessThanTxId(long transactionId);
+
+    /**
+     * Get the last log record in the stream.
+     *
+     * @return the last log record in the stream
+     * @throws IOException if a stream cannot be found.
+     */
+    LogRecordWithDLSN getLastLogRecord()
+        throws IOException;
+
+    /**
+     * Get the earliest Transaction Id available in the log.
+     *
+     * @return earliest transaction id
+     * @throws IOException
+     */
+    long getFirstTxId() throws IOException;
+
+    /**
+     * Get Latest Transaction Id in the log.
+     *
+     * @return latest transaction id
+     * @throws IOException
+     */
+    long getLastTxId() throws IOException;
+
+    /**
+     * Get Latest DLSN in the log.
+     *
+     * @return last dlsn
+     * @throws IOException
+     */
+    DLSN getLastDLSN() throws IOException;
+
+    /**
+     * Get Latest log record with DLSN in the log - async.
+     *
+     * @return latest log record with DLSN
+     */
+    Future<LogRecordWithDLSN> getLastLogRecordAsync();
+
+    /**
+     * Get Latest Transaction Id in the log - async.
+     *
+     * @return latest transaction id
+     */
+    Future<Long> getLastTxIdAsync();
+
+    /**
+     * Get first DLSN in the log.
+     *
+     * @return first dlsn in the stream
+     */
+    Future<DLSN> getFirstDLSNAsync();
+
+    /**
+     * Get Latest DLSN in the log - async.
+     *
+     * @return latest transaction id
+     */
+    Future<DLSN> getLastDLSNAsync();
+
+    /**
+     * Get the number of log records in the active portion of the log.
+     *
+     * <p>Any log segments that have already been truncated will not be included.
+     *
+     * @return number of log records
+     * @throws IOException
+     */
+    long getLogRecordCount() throws IOException;
+
+    /**
+     * Get the number of log records in the active portion of the log - async.
+     *
+     * <p>Any log segments that have already been truncated will not be included
+     *
+     * @return future number of log records
+     * @throws IOException
+     */
+    Future<Long> getLogRecordCountAsync(final DLSN beginDLSN);
+
+    /**
+     * Run recovery on the log.
+     *
+     * @throws IOException
+     */
+    void recover() throws IOException;
+
+    /**
+     * Check if an end of stream marker was added to the stream
+     * A stream with an end of stream marker cannot be appended to.
+     *
+     * @return true if the marker was added to the stream, false otherwise
+     * @throws IOException
+     */
+    boolean isEndOfStreamMarked() throws IOException;
+
+    /**
+     * Delete the log.
+     *
+     * @throws IOException if the deletion fails
+     */
+    void delete() throws IOException;
+
+    /**
+     * The DistributedLogManager may archive/purge any logs for transactionId
+     * less than or equal to minImageTxId.
+     * This is to be used only when the client explicitly manages deletion. If
+     * the cleanup policy is based on sliding time window, then this method need
+     * not be called.
+     *
+     * @param minTxIdToKeep the earliest txid that must be retained
+     * @throws IOException if purging fails
+     */
+    void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
+
+    /**
+     * Get the subscriptions store provided by the distributedlog manager.
+     *
+     * @return subscriptions store manages subscriptions for current stream.
+     */
+    SubscriptionsStore getSubscriptionsStore();
+
+    /**
+     * Closes this source and releases any system resources associated
+     * with it. If the source is already closed then invoking this
+     * method has no effect.
+     *
+     * @return future representing the close result.
+     */
+    Future<Void> asyncClose();
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java
new file mode 100644
index 0000000..aa3e94e
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.apache.distributedlog.util.FutureUtils.newTFuture;
+
+import com.twitter.util.Future;
+import java.io.IOException;
+import java.util.List;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+
+/**
+ * The wrapper of {@link org.apache.distributedlog.api.DistributedLogManager}.
+ */
+public class DistributedLogManagerImpl implements DistributedLogManager {
+
+    private final org.apache.distributedlog.api.DistributedLogManager impl;
+
+    public DistributedLogManagerImpl(org.apache.distributedlog.api.DistributedLogManager impl) {
+        this.impl = impl;
+    }
+
+    @Override
+    public String getStreamName() {
+        return impl.getStreamName();
+    }
+
+    @Override
+    public NamespaceDriver getNamespaceDriver() {
+        return impl.getNamespaceDriver();
+    }
+
+    @Override
+    public List<LogSegmentMetadata> getLogSegments() throws IOException {
+        return impl.getLogSegments();
+    }
+
+    @Override
+    public void registerListener(LogSegmentListener listener) throws IOException {
+        impl.registerListener(listener);
+    }
+
+    @Override
+    public void unregisterListener(LogSegmentListener listener) {
+        impl.unregisterListener(listener);
+    }
+
+    @Override
+    public Future<AsyncLogWriter> openAsyncLogWriter() {
+        return newTFuture(impl.openAsyncLogWriter()).map(AsyncLogWriterImpl.MAP_FUNC);
+    }
+
+    @Override
+    public LogWriter startLogSegmentNonPartitioned() throws IOException {
+        return new LogWriterImpl(impl.startLogSegmentNonPartitioned());
+    }
+
+    @Override
+    public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
+        return new AsyncLogWriterImpl(impl.startAsyncLogSegmentNonPartitioned());
+    }
+
+    @Override
+    public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException {
+        return impl.getAppendOnlyStreamWriter();
+    }
+
+    @Override
+    public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
+        return impl.getAppendOnlyStreamReader();
+    }
+
+    @Override
+    public LogReader getInputStream(long fromTxnId) throws IOException {
+        return new LogReaderImpl(impl.getInputStream(fromTxnId));
+    }
+
+    @Override
+    public LogReader getInputStream(DLSN fromDLSN) throws IOException {
+        return new LogReaderImpl(impl.getInputStream(fromDLSN));
+    }
+
+    @Override
+    public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) {
+        return newTFuture(impl.openAsyncLogReader(fromTxnId)).map(AsyncLogReaderImpl.MAP_FUNC);
+    }
+
+    @Override
+    public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
+        return newTFuture(impl.openAsyncLogReader(fromDLSN)).map(AsyncLogReaderImpl.MAP_FUNC);
+    }
+
+    @Override
+    public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException {
+        return new AsyncLogReaderImpl(impl.getAsyncLogReader(fromTxnId));
+    }
+
+    @Override
+    public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException {
+        return new AsyncLogReaderImpl(impl.getAsyncLogReader(fromDLSN));
+    }
+
+    @Override
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN) {
+        return newTFuture(impl.getAsyncLogReaderWithLock(fromDLSN)).map(AsyncLogReaderImpl.MAP_FUNC);
+    }
+
+    @Override
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId) {
+        return newTFuture(impl.getAsyncLogReaderWithLock(fromDLSN, subscriberId))
+            .map(AsyncLogReaderImpl.MAP_FUNC);
+    }
+
+    @Override
+    public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) {
+        return newTFuture(impl.getAsyncLogReaderWithLock(subscriberId))
+            .map(AsyncLogReaderImpl.MAP_FUNC);
+    }
+
+    @Override
+    public Future<DLSN> getDLSNNotLessThanTxId(long transactionId) {
+        return newTFuture(impl.getDLSNNotLessThanTxId(transactionId));
+    }
+
+    @Override
+    public LogRecordWithDLSN getLastLogRecord() throws IOException {
+        return impl.getLastLogRecord();
+    }
+
+    @Override
+    public long getFirstTxId() throws IOException {
+        return impl.getFirstTxId();
+    }
+
+    @Override
+    public long getLastTxId() throws IOException {
+        return impl.getLastTxId();
+    }
+
+    @Override
+    public DLSN getLastDLSN() throws IOException {
+        return impl.getLastDLSN();
+    }
+
+    @Override
+    public Future<LogRecordWithDLSN> getLastLogRecordAsync() {
+        return newTFuture(impl.getLastLogRecordAsync());
+    }
+
+    @Override
+    public Future<Long> getLastTxIdAsync() {
+        return newTFuture(impl.getLastTxIdAsync());
+    }
+
+    @Override
+    public Future<DLSN> getFirstDLSNAsync() {
+        return newTFuture(impl.getFirstDLSNAsync());
+    }
+
+    @Override
+    public Future<DLSN> getLastDLSNAsync() {
+        return newTFuture(impl.getLastDLSNAsync());
+    }
+
+    @Override
+    public long getLogRecordCount() throws IOException {
+        return impl.getLogRecordCount();
+    }
+
+    @Override
+    public Future<Long> getLogRecordCountAsync(DLSN beginDLSN) {
+        return newTFuture(impl.getLogRecordCountAsync(beginDLSN));
+    }
+
+    @Override
+    public void recover() throws IOException {
+        impl.recover();
+    }
+
+    @Override
+    public boolean isEndOfStreamMarked() throws IOException {
+        return impl.isEndOfStreamMarked();
+    }
+
+    @Override
+    public void delete() throws IOException {
+        impl.delete();
+    }
+
+    @Override
+    public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
+        impl.purgeLogsOlderThan(minTxIdToKeep);
+    }
+
+    @Override
+    public SubscriptionsStore getSubscriptionsStore() {
+        return new SubscriptionsStoreImpl(impl.getSubscriptionsStore());
+    }
+
+    @Override
+    public void close() throws IOException {
+        impl.close();
+    }
+
+    @Override
+    public Future<Void> asyncClose() {
+        return newTFuture(impl.asyncClose());
+    }
+}