You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:53 UTC
[21/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java b/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java
new file mode 100644
index 0000000..ee17950
--- /dev/null
+++ b/distributedlog-common/src/main/java/org/apache/distributedlog/util/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * defines the utilities used across the project.
+ */
+package org.apache.distributedlog.util;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/main/resources/findbugsExclude.xml b/distributedlog-common/src/main/resources/findbugsExclude.xml
new file mode 100644
index 0000000..ce2c176
--- /dev/null
+++ b/distributedlog-common/src/main/resources/findbugsExclude.xml
@@ -0,0 +1,32 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+//-->
+<FindBugsFilter>
+ <Match>
+ <Class name="org.apache.distributedlog.common.concurrent.FutureUtils"/>
+ <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
+ </Match>
+ <Match>
+ <Class name="org.apache.distributedlog.common.concurrent.FutureUtils$2"/>
+ <Bug pattern="NP_NONNULL_PARAM_VIOLATION" />
+ </Match>
+ <Match>
+ <Class name="org.apache.distributedlog.common.concurrent.FutureUtils"/>
+ <Method name="Void" />
+ <Bug pattern="NM_METHOD_NAMING_CONVENTION" />
+ </Match>
+</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
new file mode 100644
index 0000000..ddfb7ae
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java
@@ -0,0 +1,384 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.common.concurrent;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
+import java.util.stream.LongStream;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.junit.Test;
+
+/**
+ * Unit Test for {@link FutureUtils}.
+ */
+public class TestFutureUtils {
+
+ /**
+ * Test Exception.
+ */
+ static class TestException extends IOException {
+ private static final long serialVersionUID = -6256482498453846308L;
+
+ public TestException() {
+ super("test-exception");
+ }
+ }
+
+ @Test
+ public void testComplete() throws Exception {
+ CompletableFuture<Long> future = FutureUtils.createFuture();
+ FutureUtils.complete(future, 1024L);
+ assertEquals(1024L, FutureUtils.result(future).longValue());
+ }
+
+ @Test(expected = TestException.class)
+ public void testCompleteExceptionally() throws Exception {
+ CompletableFuture<Long> future = FutureUtils.createFuture();
+ FutureUtils.completeExceptionally(future, new TestException());
+ FutureUtils.result(future);
+ }
+
+ @Test
+ public void testWhenCompleteAsync() throws Exception {
+ OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+ .name("test-when-complete-async")
+ .corePoolSize(1)
+ .build();
+ AtomicLong resultHolder = new AtomicLong(0L);
+ CountDownLatch latch = new CountDownLatch(1);
+ CompletableFuture<Long> future = FutureUtils.createFuture();
+ FutureUtils.whenCompleteAsync(
+ future,
+ (result, cause) -> {
+ resultHolder.set(result);
+ latch.countDown();
+ },
+ scheduler,
+ new Object());
+ FutureUtils.complete(future, 1234L);
+ latch.await();
+ assertEquals(1234L, resultHolder.get());
+ }
+
+ @Test
+ public void testProxyToSuccess() throws Exception {
+ CompletableFuture<Long> src = FutureUtils.createFuture();
+ CompletableFuture<Long> target = FutureUtils.createFuture();
+ FutureUtils.proxyTo(src, target);
+ FutureUtils.complete(src, 10L);
+ assertEquals(10L, FutureUtils.result(target).longValue());
+ }
+
+ @Test(expected = TestException.class)
+ public void testProxyToFailure() throws Exception {
+ CompletableFuture<Long> src = FutureUtils.createFuture();
+ CompletableFuture<Long> target = FutureUtils.createFuture();
+ FutureUtils.proxyTo(src, target);
+ FutureUtils.completeExceptionally(src, new TestException());
+ FutureUtils.result(target);
+ }
+
+ @Test
+ public void testVoid() throws Exception {
+ CompletableFuture<Void> voidFuture = FutureUtils.Void();
+ assertTrue(voidFuture.isDone());
+ assertFalse(voidFuture.isCompletedExceptionally());
+ assertFalse(voidFuture.isCancelled());
+ }
+
+ @Test
+ public void testCollectEmptyList() throws Exception {
+ List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+ List<Integer> result = FutureUtils.result(FutureUtils.collect(futures));
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testCollectTenItems() throws Exception {
+ List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+ List<Integer> expectedResults = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ futures.add(FutureUtils.value(i));
+ expectedResults.add(i);
+ }
+ List<Integer> results = FutureUtils.result(FutureUtils.collect(futures));
+ assertEquals(expectedResults, results);
+ }
+
+ @Test(expected = TestException.class)
+ public void testCollectFailures() throws Exception {
+ List<CompletableFuture<Integer>> futures = Lists.newArrayList();
+ List<Integer> expectedResults = Lists.newArrayList();
+ for (int i = 0; i < 10; i++) {
+ if (i == 9) {
+ futures.add(FutureUtils.value(i));
+ } else {
+ futures.add(FutureUtils.exception(new TestException()));
+ }
+ expectedResults.add(i);
+ }
+ FutureUtils.result(FutureUtils.collect(futures));
+ }
+
+ @Test
+ public void testWithinAlreadyDone() throws Exception {
+ OrderedScheduler scheduler = mock(OrderedScheduler.class);
+ CompletableFuture<Long> doneFuture = FutureUtils.value(1234L);
+ CompletableFuture<Long> withinFuture = FutureUtils.within(
+ doneFuture,
+ 10,
+ TimeUnit.MILLISECONDS,
+ new TestException(),
+ scheduler,
+ 1234L);
+ TimeUnit.MILLISECONDS.sleep(20);
+ assertTrue(withinFuture.isDone());
+ assertFalse(withinFuture.isCancelled());
+ assertFalse(withinFuture.isCompletedExceptionally());
+ verify(scheduler, times(0))
+ .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testWithinZeroTimeout() throws Exception {
+ OrderedScheduler scheduler = mock(OrderedScheduler.class);
+ CompletableFuture<Long> newFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> withinFuture = FutureUtils.within(
+ newFuture,
+ 0,
+ TimeUnit.MILLISECONDS,
+ new TestException(),
+ scheduler,
+ 1234L);
+ TimeUnit.MILLISECONDS.sleep(20);
+ assertFalse(withinFuture.isDone());
+ assertFalse(withinFuture.isCancelled());
+ assertFalse(withinFuture.isCompletedExceptionally());
+ verify(scheduler, times(0))
+ .schedule(eq(1234L), isA(Runnable.class), eq(10), eq(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void testWithinCompleteBeforeTimeout() throws Exception {
+ OrderedScheduler scheduler = mock(OrderedScheduler.class);
+ ScheduledFuture<?> scheduledFuture = mock(ScheduledFuture.class);
+ when(scheduler.schedule(anyObject(), any(Runnable.class), anyLong(), any(TimeUnit.class)))
+ .thenAnswer(invocationOnMock -> scheduledFuture);
+ CompletableFuture<Long> newFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> withinFuture = FutureUtils.within(
+ newFuture,
+ Long.MAX_VALUE,
+ TimeUnit.MILLISECONDS,
+ new TestException(),
+ scheduler,
+ 1234L);
+ assertFalse(withinFuture.isDone());
+ assertFalse(withinFuture.isCancelled());
+ assertFalse(withinFuture.isCompletedExceptionally());
+
+ newFuture.complete(5678L);
+
+ assertTrue(withinFuture.isDone());
+ assertFalse(withinFuture.isCancelled());
+ assertFalse(withinFuture.isCompletedExceptionally());
+ assertEquals((Long) 5678L, FutureUtils.result(withinFuture));
+
+ verify(scheduledFuture, times(1))
+ .cancel(eq(true));
+ }
+
+ @Test
+ public void testIgnoreSuccess() {
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
+ underlyFuture.complete(1234L);
+ assertTrue(ignoredFuture.isDone());
+ assertFalse(ignoredFuture.isCompletedExceptionally());
+ assertFalse(ignoredFuture.isCancelled());
+ }
+
+ @Test
+ public void testIgnoreFailure() {
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Void> ignoredFuture = FutureUtils.ignore(underlyFuture);
+ underlyFuture.completeExceptionally(new TestException());
+ assertTrue(ignoredFuture.isDone());
+ assertFalse(ignoredFuture.isCompletedExceptionally());
+ assertFalse(ignoredFuture.isCancelled());
+ }
+
+ @Test
+ public void testEnsureSuccess() throws Exception {
+ CountDownLatch ensureLatch = new CountDownLatch(1);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
+ ensureLatch.countDown();
+ });
+ underlyFuture.complete(1234L);
+ FutureUtils.result(ensuredFuture);
+ assertTrue(ensuredFuture.isDone());
+ assertFalse(ensuredFuture.isCompletedExceptionally());
+ assertFalse(ensuredFuture.isCancelled());
+ ensureLatch.await();
+ }
+
+ @Test
+ public void testEnsureFailure() throws Exception {
+ CountDownLatch ensureLatch = new CountDownLatch(1);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> ensuredFuture = FutureUtils.ensure(underlyFuture, () -> {
+ ensureLatch.countDown();
+ });
+ underlyFuture.completeExceptionally(new TestException());
+ FutureUtils.result(FutureUtils.ignore(ensuredFuture));
+ assertTrue(ensuredFuture.isDone());
+ assertTrue(ensuredFuture.isCompletedExceptionally());
+ assertFalse(ensuredFuture.isCancelled());
+ ensureLatch.await();
+ }
+
+ @Test
+ public void testRescueSuccess() throws Exception {
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ Function<Throwable, CompletableFuture<Long>> rescueFuc = mock(Function.class);
+ CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, rescueFuc);
+ underlyFuture.complete(1234L);
+ FutureUtils.result(rescuedFuture);
+ assertTrue(rescuedFuture.isDone());
+ assertFalse(rescuedFuture.isCompletedExceptionally());
+ assertFalse(rescuedFuture.isCancelled());
+ verify(rescueFuc, times(0)).apply(any(Throwable.class));
+ }
+
+ @Test
+ public void testRescueFailure() throws Exception {
+ CompletableFuture<Long> futureCompletedAtRescue = FutureUtils.value(3456L);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> rescuedFuture = FutureUtils.rescue(underlyFuture, (cause) -> futureCompletedAtRescue);
+ underlyFuture.completeExceptionally(new TestException());
+ FutureUtils.result(rescuedFuture);
+ assertTrue(rescuedFuture.isDone());
+ assertFalse(rescuedFuture.isCompletedExceptionally());
+ assertFalse(rescuedFuture.isCancelled());
+ assertEquals((Long) 3456L, FutureUtils.result(rescuedFuture));
+ }
+
+ @Test
+ public void testStatsSuccess() throws Exception {
+ OpStatsLogger statsLogger = mock(OpStatsLogger.class);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> statsFuture = FutureUtils.stats(
+ underlyFuture,
+ statsLogger,
+ Stopwatch.createStarted());
+ underlyFuture.complete(1234L);
+ FutureUtils.result(statsFuture);
+ verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong());
+ }
+
+ @Test
+ public void testStatsFailure() throws Exception {
+ OpStatsLogger statsLogger = mock(OpStatsLogger.class);
+ CompletableFuture<Long> underlyFuture = FutureUtils.createFuture();
+ CompletableFuture<Long> statsFuture = FutureUtils.stats(
+ underlyFuture,
+ statsLogger,
+ Stopwatch.createStarted());
+ underlyFuture.completeExceptionally(new TestException());
+ FutureUtils.result(FutureUtils.ignore(statsFuture));
+ verify(statsLogger, times(1)).registerFailedEvent(anyLong());
+ }
+
+ @Test
+ public void testProcessListSuccess() throws Exception {
+ List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
+ List<Long> expectedList = Lists.transform(
+ longList,
+ aLong -> 2 * aLong);
+ Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
+ CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+ longList,
+ sumFunc,
+ null);
+ assertEquals(expectedList, FutureUtils.result(totalFuture));
+ }
+
+ @Test
+ public void testProcessEmptyList() throws Exception {
+ List<Long> longList = Lists.newArrayList();
+ List<Long> expectedList = Lists.transform(
+ longList,
+ aLong -> 2 * aLong);
+ Function<Long, CompletableFuture<Long>> sumFunc = value -> FutureUtils.value(2 * value);
+ CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+ longList,
+ sumFunc,
+ null);
+ assertEquals(expectedList, FutureUtils.result(totalFuture));
+ }
+
+ @Test
+ public void testProcessListFailures() throws Exception {
+ List<Long> longList = Lists.newArrayList(LongStream.range(0L, 10L).iterator());
+ AtomicLong total = new AtomicLong(0L);
+ Function<Long, CompletableFuture<Long>> sumFunc = value -> {
+ if (value < 5) {
+ total.addAndGet(value);
+ return FutureUtils.value(2 * value);
+ } else {
+ return FutureUtils.exception(new TestException());
+ }
+ };
+ CompletableFuture<List<Long>> totalFuture = FutureUtils.processList(
+ longList,
+ sumFunc,
+ null);
+ try {
+ FutureUtils.result(totalFuture);
+ fail("Should fail with TestException");
+ } catch (TestException te) {
+ // as expected
+ }
+ assertEquals(10L, total.get());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java
new file mode 100644
index 0000000..6b3ca58
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/PropertiesWriter.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.Properties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Writer to write properties to files.
+ */
+public class PropertiesWriter {
+ static final Logger LOG = LoggerFactory.getLogger(PropertiesWriter.class);
+
+ final FileOutputStream outputStream;
+ final File configFile;
+ final Properties properties;
+
+ public PropertiesWriter() throws Exception {
+ this(null);
+ }
+
+ public PropertiesWriter(File configFile) throws Exception {
+ if (null == configFile) {
+ this.configFile = File.createTempFile("temp", ".conf");
+ } else {
+ this.configFile = configFile;
+ }
+ this.configFile.deleteOnExit();
+ this.properties = new Properties();
+ this.outputStream = new FileOutputStream(this.configFile);
+ }
+
+ public void setProperty(String key, String value) {
+ properties.setProperty(key, value);
+ }
+
+ public void removeProperty(String key) {
+ properties.remove(key);
+ }
+
+ public void save() throws Exception {
+ FileOutputStream outputStream = new FileOutputStream(configFile);
+ properties.store(outputStream, null);
+ configFile.setLastModified(configFile.lastModified() + 1000);
+ LOG.debug("save modified={}", configFile.lastModified());
+ }
+
+ public File getFile() {
+ return configFile;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java
new file mode 100644
index 0000000..a54faa0
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConcurrentBaseConfiguration.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import static org.junit.Assert.assertEquals;
+
+import org.junit.Test;
+
+/**
+ * Unit test of {@link ConcurrentBaseConfiguration}.
+ */
+public class TestConcurrentBaseConfiguration {
+
+ @Test(timeout = 20000)
+ public void testBasicOperations() throws Exception {
+ ConcurrentBaseConfiguration conf = new ConcurrentBaseConfiguration();
+ conf.setProperty("prop1", "1");
+ assertEquals(1, conf.getInt("prop1"));
+ conf.setProperty("prop1", "2");
+ assertEquals(2, conf.getInt("prop1"));
+ conf.clearProperty("prop1");
+ assertEquals(null, conf.getInteger("prop1", null));
+ conf.setProperty("prop1", "1");
+ conf.setProperty("prop2", "2");
+ assertEquals(1, conf.getInt("prop1"));
+ assertEquals(2, conf.getInt("prop2"));
+ conf.clearProperty("prop1");
+ assertEquals(null, conf.getInteger("prop1", null));
+ assertEquals(2, conf.getInt("prop2"));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java
new file mode 100644
index 0000000..a474f89
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/config/TestConfigurationSubscription.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.config;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.configuration.CompositeConfiguration;
+import org.apache.commons.configuration.event.ConfigurationEvent;
+import org.apache.commons.configuration.event.ConfigurationListener;
+import org.jmock.lib.concurrent.DeterministicScheduler;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Notes:
+ * 1. lastModified granularity is platform dependent, generally 1 sec, so we can't wait 1ms for things to
+ * get picked up.
+ */
+public class TestConfigurationSubscription {
+ static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class);
+
+ /**
+ * Give FileChangedReloadingStrategy some time to start reloading.
+ *
+ * <p>Make sure now!=lastChecked
+ * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()}
+ */
+ private void ensureConfigReloaded() throws InterruptedException {
+ // sleep 1 ms so that System.currentTimeMillis() !=
+ // lastChecked (the time we construct FileChangedReloadingStrategy
+ Thread.sleep(1);
+ }
+
+ @Test(timeout = 60000)
+ public void testReloadConfiguration() throws Exception {
+ PropertiesWriter writer = new PropertiesWriter();
+ FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
+ ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration());
+ DeterministicScheduler executorService = new DeterministicScheduler();
+ List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
+ ConfigurationSubscription confSub =
+ new ConfigurationSubscription(conf, fileConfigBuilders, executorService, 100, TimeUnit.MILLISECONDS);
+ final AtomicReference<ConcurrentBaseConfiguration> confHolder = new AtomicReference<>();
+ confSub.registerListener(new org.apache.distributedlog.common.config.ConfigurationListener() {
+ @Override
+ public void onReload(ConcurrentBaseConfiguration conf) {
+ confHolder.set(conf);
+ }
+ });
+ assertEquals(null, conf.getProperty("prop1"));
+
+ // add
+ writer.setProperty("prop1", "1");
+ writer.save();
+ // ensure the file change reloading event can be triggered
+ ensureConfigReloaded();
+ // reload the config
+ confSub.reload();
+ assertNotNull(confHolder.get());
+ assertTrue(conf == confHolder.get());
+ assertEquals("1", conf.getProperty("prop1"));
+ }
+
+ @Test(timeout = 60000)
+ public void testAddReloadBasicsConfig() throws Exception {
+ PropertiesWriter writer = new PropertiesWriter();
+ DeterministicScheduler mockScheduler = new DeterministicScheduler();
+ FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
+ ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration());
+ List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
+ ConfigurationSubscription confSub =
+ new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
+ assertEquals(null, conf.getProperty("prop1"));
+
+ // add
+ writer.setProperty("prop1", "1");
+ writer.save();
+ // ensure the file change reloading event can be triggered
+ ensureConfigReloaded();
+ mockScheduler.tick(100, TimeUnit.MILLISECONDS);
+ assertEquals("1", conf.getProperty("prop1"));
+
+ }
+
+ @Test(timeout = 60000)
+ public void testInitialConfigLoad() throws Exception {
+ PropertiesWriter writer = new PropertiesWriter();
+ writer.setProperty("prop1", "1");
+ writer.setProperty("prop2", "abc");
+ writer.setProperty("prop3", "123.0");
+ writer.setProperty("prop4", "11132");
+ writer.setProperty("prop5", "true");
+ writer.save();
+
+ ScheduledExecutorService mockScheduler = new DeterministicScheduler();
+ FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
+ ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration());
+ List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
+ ConfigurationSubscription confSub =
+ new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
+ assertEquals(1, conf.getInt("prop1"));
+ assertEquals("abc", conf.getString("prop2"));
+ assertEquals(123.0, conf.getFloat("prop3"), 0);
+ assertEquals(11132, conf.getInt("prop4"));
+ assertEquals(true, conf.getBoolean("prop5"));
+ }
+
+ @Test(timeout = 60000)
+ public void testExceptionInConfigLoad() throws Exception {
+ PropertiesWriter writer = new PropertiesWriter();
+ writer.setProperty("prop1", "1");
+ writer.save();
+
+ DeterministicScheduler mockScheduler = new DeterministicScheduler();
+ FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
+ ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new CompositeConfiguration());
+ List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
+ ConfigurationSubscription confSub =
+ new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
+
+ final AtomicInteger count = new AtomicInteger(1);
+ conf.addConfigurationListener(new ConfigurationListener() {
+ @Override
+ public void configurationChanged(ConfigurationEvent event) {
+ LOG.info("config changed {}", event);
+ // Throw after so we actually see the update anyway.
+ if (!event.isBeforeUpdate()) {
+ count.getAndIncrement();
+ throw new RuntimeException("config listener threw and exception");
+ }
+ }
+ });
+
+ int i = 0;
+ int initial = 0;
+ while (count.get() == initial) {
+ writer.setProperty("prop1", Integer.toString(i++));
+ writer.save();
+ mockScheduler.tick(100, TimeUnit.MILLISECONDS);
+ }
+
+ initial = count.get();
+ while (count.get() == initial) {
+ writer.setProperty("prop1", Integer.toString(i++));
+ writer.save();
+ mockScheduler.tick(100, TimeUnit.MILLISECONDS);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java
new file mode 100644
index 0000000..7a981d1
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TestTimedOutTestsListener.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.notification.Failure;
+
+/**
+ * Test Case for {@link TimedOutTestsListener}.
+ */
+public class TestTimedOutTestsListener {
+
+ private static class Deadlock {
+ private CyclicBarrier barrier = new CyclicBarrier(6);
+
+ public Deadlock() {
+ DeadlockThread[] dThreads = new DeadlockThread[6];
+
+ Monitor a = new Monitor("a");
+ Monitor b = new Monitor("b");
+ Monitor c = new Monitor("c");
+ dThreads[0] = new DeadlockThread("MThread-1", a, b);
+ dThreads[1] = new DeadlockThread("MThread-2", b, c);
+ dThreads[2] = new DeadlockThread("MThread-3", c, a);
+
+ Lock d = new ReentrantLock();
+ Lock e = new ReentrantLock();
+ Lock f = new ReentrantLock();
+
+ dThreads[3] = new DeadlockThread("SThread-4", d, e);
+ dThreads[4] = new DeadlockThread("SThread-5", e, f);
+ dThreads[5] = new DeadlockThread("SThread-6", f, d);
+
+ // make them daemon threads so that the test will exit
+ for (int i = 0; i < 6; i++) {
+ dThreads[i].setDaemon(true);
+ dThreads[i].start();
+ }
+ }
+
+ class DeadlockThread extends Thread {
+ private Lock lock1 = null;
+
+ private Lock lock2 = null;
+
+ private Monitor mon1 = null;
+
+ private Monitor mon2 = null;
+
+ private boolean useSync;
+
+ DeadlockThread(String name, Lock lock1, Lock lock2) {
+ super(name);
+ this.lock1 = lock1;
+ this.lock2 = lock2;
+ this.useSync = true;
+ }
+
+ DeadlockThread(String name, Monitor mon1, Monitor mon2) {
+ super(name);
+ this.mon1 = mon1;
+ this.mon2 = mon2;
+ this.useSync = false;
+ }
+
+ public void run() {
+ if (useSync) {
+ syncLock();
+ } else {
+ monitorLock();
+ }
+ }
+
+ private void syncLock() {
+ lock1.lock();
+ try {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ }
+ goSyncDeadlock();
+ } finally {
+ lock1.unlock();
+ }
+ }
+
+ private void goSyncDeadlock() {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ }
+ lock2.lock();
+ throw new RuntimeException("should not reach here.");
+ }
+
+ private void monitorLock() {
+ synchronized (mon1) {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ }
+ goMonitorDeadlock();
+ }
+ }
+
+ private void goMonitorDeadlock() {
+ try {
+ barrier.await();
+ } catch (Exception e) {
+ }
+ synchronized (mon2) {
+ throw new RuntimeException(getName() + " should not reach here.");
+ }
+ }
+ }
+
+ class Monitor {
+ String name;
+
+ Monitor(String name) {
+ this.name = name;
+ }
+ }
+
+ }
+
+ @Test(timeout = 500)
+ public void testThreadDumpAndDeadlocks() throws Exception {
+ new Deadlock();
+ String s = null;
+ while (true) {
+ s = TimedOutTestsListener.buildDeadlockInfo();
+ if (s != null) {
+ break;
+ }
+ Thread.sleep(100);
+ }
+
+ Assert.assertEquals(3, countStringOccurrences(s, "BLOCKED"));
+
+ Failure failure = new Failure(null, new Exception(TimedOutTestsListener.TEST_TIMED_OUT_PREFIX));
+ StringWriter writer = new StringWriter();
+ new TimedOutTestsListener(new PrintWriter(writer)).testFailure(failure);
+ String out = writer.toString();
+
+ Assert.assertTrue(out.contains("THREAD DUMP"));
+ Assert.assertTrue(out.contains("DEADLOCKS DETECTED"));
+
+ System.out.println(out);
+ }
+
+ private int countStringOccurrences(String s, String substr) {
+ int n = 0;
+ int index = 0;
+ while ((index = s.indexOf(substr, index) + 1) != 0) {
+ n++;
+ }
+ return n;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java
new file mode 100644
index 0000000..c86cf8f
--- /dev/null
+++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/util/TimedOutTestsListener.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.common.util;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.lang.management.ThreadMXBean;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.junit.runner.notification.Failure;
+import org.junit.runner.notification.RunListener;
+
+/**
+ * JUnit run listener which prints full thread dump into System.err in case a test is failed due to
+ * timeout.
+ */
+public class TimedOutTestsListener extends RunListener {
+
+ static final String TEST_TIMED_OUT_PREFIX = "test timed out after";
+
+ private static String indent = " ";
+
+ private final PrintWriter output;
+
+ public TimedOutTestsListener() {
+ this.output = new PrintWriter(System.err);
+ }
+
+ public TimedOutTestsListener(PrintWriter output) {
+ this.output = output;
+ }
+
+ @Override
+ public void testFailure(Failure failure) throws Exception {
+ if (failure != null && failure.getMessage() != null && failure.getMessage().startsWith(TEST_TIMED_OUT_PREFIX)) {
+ output.println("====> TEST TIMED OUT. PRINTING THREAD DUMP. <====");
+ output.println();
+ output.print(buildThreadDiagnosticString());
+ }
+ }
+
+ public static String buildThreadDiagnosticString() {
+ StringWriter sw = new StringWriter();
+ PrintWriter output = new PrintWriter(sw);
+
+ DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
+ output.println(String.format("Timestamp: %s", dateFormat.format(new Date())));
+ output.println();
+ output.println(buildThreadDump());
+
+ String deadlocksInfo = buildDeadlockInfo();
+ if (deadlocksInfo != null) {
+ output.println("====> DEADLOCKS DETECTED <====");
+ output.println();
+ output.println(deadlocksInfo);
+ }
+
+ return sw.toString();
+ }
+
+ static String buildThreadDump() {
+ StringBuilder dump = new StringBuilder();
+ Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces();
+ for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) {
+ Thread thread = e.getKey();
+ dump.append(String.format("\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s", thread.getName(),
+ (thread.isDaemon() ? "daemon" : ""), thread.getPriority(), thread.getId(),
+ Thread.State.WAITING.equals(thread.getState()) ? "in Object.wait()"
+ : StringUtils.lowerCase(thread.getState().name()),
+ Thread.State.WAITING.equals(thread.getState()) ? "WAITING (on object monitor)" : thread.getState()));
+ for (StackTraceElement stackTraceElement : e.getValue()) {
+ dump.append("\n at ");
+ dump.append(stackTraceElement);
+ }
+ dump.append("\n");
+ }
+ return dump.toString();
+ }
+
+ static String buildDeadlockInfo() {
+ ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
+ long[] threadIds = threadBean.findMonitorDeadlockedThreads();
+ if (threadIds != null && threadIds.length > 0) {
+ StringWriter stringWriter = new StringWriter();
+ PrintWriter out = new PrintWriter(stringWriter);
+
+ ThreadInfo[] infos = threadBean.getThreadInfo(threadIds, true, true);
+ for (ThreadInfo ti : infos) {
+ printThreadInfo(ti, out);
+ printLockInfo(ti.getLockedSynchronizers(), out);
+ out.println();
+ }
+
+ out.close();
+ return stringWriter.toString();
+ } else {
+ return null;
+ }
+ }
+
+ private static void printThreadInfo(ThreadInfo ti, PrintWriter out) {
+ // print thread information
+ printThread(ti, out);
+
+ // print stack trace with locks
+ StackTraceElement[] stacktrace = ti.getStackTrace();
+ MonitorInfo[] monitors = ti.getLockedMonitors();
+ for (int i = 0; i < stacktrace.length; i++) {
+ StackTraceElement ste = stacktrace[i];
+ out.println(indent + "at " + ste.toString());
+ for (MonitorInfo mi : monitors) {
+ if (mi.getLockedStackDepth() == i) {
+ out.println(indent + " - locked " + mi);
+ }
+ }
+ }
+ out.println();
+ }
+
+ private static void printThread(ThreadInfo ti, PrintWriter out) {
+ out.print("\"" + ti.getThreadName() + "\"" + " Id=" + ti.getThreadId() + " in " + ti.getThreadState());
+ if (ti.getLockName() != null) {
+ out.print(" on lock=" + ti.getLockName());
+ }
+ if (ti.isSuspended()) {
+ out.print(" (suspended)");
+ }
+ if (ti.isInNative()) {
+ out.print(" (running in native)");
+ }
+ out.println();
+ if (ti.getLockOwnerName() != null) {
+ out.println(indent + " owned by " + ti.getLockOwnerName() + " Id=" + ti.getLockOwnerId());
+ }
+ }
+
+ private static void printLockInfo(LockInfo[] locks, PrintWriter out) {
+ out.println(indent + "Locked synchronizers: count = " + locks.length);
+ for (LockInfo li : locks) {
+ out.println(indent + " - " + li);
+ }
+ out.println();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/pom.xml b/distributedlog-core-twitter/pom.xml
new file mode 100644
index 0000000..41116b9
--- /dev/null
+++ b/distributedlog-core-twitter/pom.xml
@@ -0,0 +1,141 @@
+<?xml version="1.0"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog</artifactId>
+ <version>0.5.0-incubating-SNAPSHOT</version>
+ </parent>
+ <artifactId>distributedlog-core-twitter</artifactId>
+ <name>Apache DistributedLog :: Core Library (Twitter Future Interface)</name>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-core</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>util-core_2.11</artifactId>
+ <version>${finagle.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-common</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>${maven-compiler-plugin.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>${maven-jar-plugin.version}</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>${maven-surefire-plugin.version}</version>
+ <configuration>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <argLine>-Xmx3G -Djava.net.preferIPv4Stack=true -XX:MaxDirectMemorySize=2G</argLine>
+ <forkMode>always</forkMode>
+ <forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
+ <properties>
+ <property>
+ <name>listener</name>
+ <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value>
+ </property>
+ </properties>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>findbugs-maven-plugin</artifactId>
+ <configuration>
+ <excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>${maven-checkstyle-plugin.version}</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>${puppycrawl.checkstyle.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-build-tools</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>distributedlog/checkstyle.xml</configLocation>
+ <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
+ <consoleOutput>true</consoleOutput>
+ <failOnViolation>true</failOnViolation>
+ <includeResources>false</includeResources>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java
new file mode 100644
index 0000000..4ec1dfa
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReader.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.twitter.util.Future;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A log reader to read records in asynchronous way.
+ */
+public interface AsyncLogReader {
+
+ /**
+ * Get stream name that the reader reads from.
+ *
+ * @return stream name.
+ */
+ String getStreamName();
+
+ /**
+ * Read the next record from the log stream.
+ *
+ * @return A promise that when satisfied will contain the Log Record with its DLSN.
+ */
+ Future<LogRecordWithDLSN> readNext();
+
+ /**
+ * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
+ * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
+ * call.
+ *
+ * @param numEntries
+ * num entries
+ * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+ */
+ Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
+
+ /**
+ * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
+ *
+ * <p>The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
+ * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
+ * wait until new entries are available.
+ *
+ * @param numEntries
+ * max entries to return
+ * @param waitTime
+ * maximum wait time if there are entries already for read
+ * @param timeUnit
+ * wait time unit
+ * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
+ */
+ Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
+
+ /**
+ * Closes this source and releases any system resources associated
+ * with it. If the source is already closed then invoking this
+ * method has no effect.
+ *
+ * @return future representing the close result.
+ */
+ Future<Void> asyncClose();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java
new file mode 100644
index 0000000..4f4a90e
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogReaderImpl.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.apache.distributedlog.util.FutureUtils.newTFuture;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.util.Future;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * Implementation wrapper of {@link org.apache.distributedlog.api.AsyncLogReader}.
+ */
+class AsyncLogReaderImpl implements AsyncLogReader {
+
+ static final Function1<org.apache.distributedlog.api.AsyncLogReader, AsyncLogReader> MAP_FUNC =
+ new AbstractFunction1<org.apache.distributedlog.api.AsyncLogReader, AsyncLogReader>() {
+ @Override
+ public AsyncLogReader apply(org.apache.distributedlog.api.AsyncLogReader reader) {
+ return new AsyncLogReaderImpl(reader);
+ }
+ };
+
+ private final org.apache.distributedlog.api.AsyncLogReader impl;
+
+ AsyncLogReaderImpl(org.apache.distributedlog.api.AsyncLogReader impl) {
+ this.impl = impl;
+ }
+
+ @VisibleForTesting
+ org.apache.distributedlog.api.AsyncLogReader getImpl() {
+ return impl;
+ }
+
+ @Override
+ public String getStreamName() {
+ return impl.getStreamName();
+ }
+
+ @Override
+ public Future<LogRecordWithDLSN> readNext() {
+ return newTFuture(impl.readNext());
+ }
+
+ @Override
+ public Future<List<LogRecordWithDLSN>> readBulk(int numEntries) {
+ return newTFuture(impl.readBulk(numEntries));
+ }
+
+ @Override
+ public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit) {
+ return newTFuture(impl.readBulk(numEntries, waitTime, timeUnit));
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ return newTFuture(impl.asyncClose());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
new file mode 100644
index 0000000..915877c
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.twitter.util.Future;
+import java.util.List;
+
+/**
+ * A writer that appends log records asynchronously.
+ */
+public interface AsyncLogWriter {
+
+ /**
+ * Get the last committed transaction id.
+ *
+ * @return last committed transaction id.
+ */
+ long getLastTxId();
+
+ /**
+ * Write a log record to the stream.
+ *
+ * @param record single log record
+ * @return A Future which contains a DLSN if the record was successfully written
+ * or an exception if the write fails
+ */
+ Future<DLSN> write(LogRecord record);
+
+ /**
+ * Write log records to the stream in bulk. Each future in the list represents the result of
+ * one write operation. The size of the result list is equal to the size of the input list.
+ * Buffers are written in order, and the list of result futures has the same order.
+ *
+ * @param record set of log records
+ * @return A Future which contains a list of Future DLSNs if the record was successfully written
+ * or an exception if the operation fails.
+ */
+ Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record);
+
+ /**
+ * Truncate the log until <i>dlsn</i>.
+ *
+ * @param dlsn
+ * dlsn to truncate until.
+ * @return A Future indicates whether the operation succeeds or not, or an exception
+ * if the truncation fails.
+ */
+ Future<Boolean> truncate(DLSN dlsn);
+
+ /**
+ * Get the name of the stream this writer writes data to.
+ */
+ String getStreamName();
+
+ /**
+ * Closes this source and releases any system resources associated
+ * with it. If the source is already closed then invoking this
+ * method has no effect.
+ *
+ * @return future representing the close result.
+ */
+ Future<Void> asyncClose();
+
+ /**
+ * Aborts the object and releases any resources associated with it.
+ * If the object is already aborted then invoking this method has no
+ * effect.
+ *
+ * @return future represents the abort result
+ */
+ Future<Void> asyncAbort();
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java
new file mode 100644
index 0000000..dc28bb1
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/AsyncLogWriterImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.apache.distributedlog.util.FutureUtils.newTFuture;
+import static org.apache.distributedlog.util.FutureUtils.newTFutureList;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.twitter.util.Future;
+import java.util.List;
+import scala.Function1;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * The implementation of {@link AsyncLogWriter} built over {@link org.apache.distributedlog.api.AsyncLogWriter}.
+ */
+class AsyncLogWriterImpl implements AsyncLogWriter {
+
+ static final Function1<org.apache.distributedlog.api.AsyncLogWriter, AsyncLogWriter> MAP_FUNC =
+ new AbstractFunction1<org.apache.distributedlog.api.AsyncLogWriter, AsyncLogWriter>() {
+ @Override
+ public AsyncLogWriter apply(org.apache.distributedlog.api.AsyncLogWriter writer) {
+ return new AsyncLogWriterImpl(writer);
+ }
+ };
+
+ private final org.apache.distributedlog.api.AsyncLogWriter impl;
+
+ AsyncLogWriterImpl(org.apache.distributedlog.api.AsyncLogWriter impl) {
+ this.impl = impl;
+ }
+
+ @VisibleForTesting
+ org.apache.distributedlog.api.AsyncLogWriter getImpl() {
+ return impl;
+ }
+
+ @Override
+ public long getLastTxId() {
+ return impl.getLastTxId();
+ }
+
+ @Override
+ public Future<DLSN> write(LogRecord record) {
+ return newTFuture(impl.write(record));
+ }
+
+ @Override
+ public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record) {
+ return newTFutureList(impl.writeBulk(record));
+ }
+
+ @Override
+ public Future<Boolean> truncate(DLSN dlsn) {
+ return newTFuture(impl.truncate(dlsn));
+ }
+
+ @Override
+ public String getStreamName() {
+ return impl.getStreamName();
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ return newTFuture(impl.asyncClose());
+ }
+
+ @Override
+ public Future<Void> asyncAbort() {
+ return newTFuture(impl.asyncAbort());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java
new file mode 100644
index 0000000..14f05c3
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManager.java
@@ -0,0 +1,317 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog;
+
+import com.twitter.util.Future;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+
+/**
+ * A DistributedLogManager is responsible for managing a single place of storing
+ * edit logs. It may correspond to multiple files, a backup node, etc.
+ * Even when the actual underlying storage is rolled, or failed and restored,
+ * each conceptual place of storage corresponds to exactly one instance of
+ * this class, which is created when the EditLog is first opened.
+ */
+public interface DistributedLogManager extends Closeable {
+
+ /**
+ * Get the name of the stream managed by this log manager.
+ *
+ * @return streamName
+ */
+ String getStreamName();
+
+ /**
+ * Get the namespace driver used by this manager.
+ *
+ * @return the namespace driver
+ */
+ NamespaceDriver getNamespaceDriver();
+
+ /**
+ * Get log segments.
+ *
+ * @return log segments
+ * @throws IOException
+ */
+ List<LogSegmentMetadata> getLogSegments() throws IOException;
+
+ /**
+ * Register <i>listener</i> on log segment updates of this stream.
+ *
+ * @param listener
+ * listener to receive update log segment list.
+ */
+ void registerListener(LogSegmentListener listener) throws IOException;
+
+ /**
+ * Unregister <i>listener</i> on log segment updates from this stream.
+ *
+ * @param listener
+ * listener to receive update log segment list.
+ */
+ void unregisterListener(LogSegmentListener listener);
+
+ /**
+ * Open async log writer to write records to the log stream.
+ *
+ * @return result represents the open result
+ */
+ Future<AsyncLogWriter> openAsyncLogWriter();
+
+ /**
+ * Begin writing to the log stream identified by the name.
+ *
+ * @return the writer interface to generate log records
+ */
+ LogWriter startLogSegmentNonPartitioned() throws IOException;
+
+ /**
+ * Begin writing to the log stream identified by the name.
+ *
+ * @return the writer interface to generate log records
+ */
+ // @Deprecated
+ AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
+
+ /**
+ * Begin appending to the end of the log stream which is being treated as a sequence of bytes.
+ *
+ * @return the writer interface to generate log records
+ */
+ AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
+
+ /**
+ * Get a reader to read a log stream as a sequence of bytes.
+ *
+ * @return the writer interface to generate log records
+ */
+ AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
+
+ /**
+ * Get the input stream starting with fromTxnId for the specified log.
+ *
+ * @param fromTxnId - the first transaction id we want to read
+ * @return the stream starting with transaction fromTxnId
+ * @throws IOException if a stream cannot be found.
+ */
+ LogReader getInputStream(long fromTxnId)
+ throws IOException;
+
+ LogReader getInputStream(DLSN fromDLSN) throws IOException;
+
+ /**
+ * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
+ *
+ * @param fromTxnId
+ * transaction id to start reading from
+ * @return async log reader
+ */
+ Future<AsyncLogReader> openAsyncLogReader(long fromTxnId);
+
+ /**
+ * Open an async log reader to read records from a log starting from <code>fromDLSN</code>.
+ *
+ * @param fromDLSN
+ * dlsn to start reading from
+ * @return async log reader
+ */
+ Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
+
+ // @Deprecated
+ AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
+
+ // @Deprecated
+ AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
+
+ Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
+
+ /**
+ * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
+ * If two readers tried to open using same subscriberId, one would succeed, while the other
+ * will be blocked until it gets the lock.
+ *
+ * @param fromDLSN
+ * start dlsn
+ * @param subscriberId
+ * subscriber id
+ * @return async log reader
+ */
+ Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
+
+ /**
+ * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
+ * its last commit position recorded in subscription store. If no last commit position found
+ * in subscription store, it would start reading from head of the stream.
+ *
+ * <p>If the two readers tried to open using same subscriberId, one would succeed, while the other
+ * will be blocked until it gets the lock.
+ *
+ * @param subscriberId
+ * subscriber id
+ * @return async log reader
+ */
+ Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
+
+ /**
+ * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
+ *
+ * @param transactionId
+ * transaction id
+ * @return dlsn of first log record whose transaction id is not less than transactionId.
+ */
+ Future<DLSN> getDLSNNotLessThanTxId(long transactionId);
+
+ /**
+ * Get the last log record in the stream.
+ *
+ * @return the last log record in the stream
+ * @throws IOException if a stream cannot be found.
+ */
+ LogRecordWithDLSN getLastLogRecord()
+ throws IOException;
+
+ /**
+ * Get the earliest Transaction Id available in the log.
+ *
+ * @return earliest transaction id
+ * @throws IOException
+ */
+ long getFirstTxId() throws IOException;
+
+ /**
+ * Get Latest Transaction Id in the log.
+ *
+ * @return latest transaction id
+ * @throws IOException
+ */
+ long getLastTxId() throws IOException;
+
+ /**
+ * Get Latest DLSN in the log.
+ *
+ * @return last dlsn
+ * @throws IOException
+ */
+ DLSN getLastDLSN() throws IOException;
+
+ /**
+ * Get Latest log record with DLSN in the log - async.
+ *
+ * @return latest log record with DLSN
+ */
+ Future<LogRecordWithDLSN> getLastLogRecordAsync();
+
+ /**
+ * Get Latest Transaction Id in the log - async.
+ *
+ * @return latest transaction id
+ */
+ Future<Long> getLastTxIdAsync();
+
+ /**
+ * Get first DLSN in the log.
+ *
+ * @return first dlsn in the stream
+ */
+ Future<DLSN> getFirstDLSNAsync();
+
+ /**
+ * Get Latest DLSN in the log - async.
+ *
+ * @return latest transaction id
+ */
+ Future<DLSN> getLastDLSNAsync();
+
+ /**
+ * Get the number of log records in the active portion of the log.
+ *
+ * <p>Any log segments that have already been truncated will not be included.
+ *
+ * @return number of log records
+ * @throws IOException
+ */
+ long getLogRecordCount() throws IOException;
+
+ /**
+ * Get the number of log records in the active portion of the log - async.
+ *
+ * <p>Any log segments that have already been truncated will not be included
+ *
+ * @return future number of log records
+ * @throws IOException
+ */
+ Future<Long> getLogRecordCountAsync(final DLSN beginDLSN);
+
+ /**
+ * Run recovery on the log.
+ *
+ * @throws IOException
+ */
+ void recover() throws IOException;
+
+ /**
+ * Check if an end of stream marker was added to the stream
+ * A stream with an end of stream marker cannot be appended to.
+ *
+ * @return true if the marker was added to the stream, false otherwise
+ * @throws IOException
+ */
+ boolean isEndOfStreamMarked() throws IOException;
+
+ /**
+ * Delete the log.
+ *
+ * @throws IOException if the deletion fails
+ */
+ void delete() throws IOException;
+
+ /**
+ * The DistributedLogManager may archive/purge any logs for transactionId
+ * less than or equal to minImageTxId.
+ * This is to be used only when the client explicitly manages deletion. If
+ * the cleanup policy is based on sliding time window, then this method need
+ * not be called.
+ *
+ * @param minTxIdToKeep the earliest txid that must be retained
+ * @throws IOException if purging fails
+ */
+ void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
+
+ /**
+ * Get the subscriptions store provided by the distributedlog manager.
+ *
+ * @return subscriptions store manages subscriptions for current stream.
+ */
+ SubscriptionsStore getSubscriptionsStore();
+
+ /**
+ * Closes this source and releases any system resources associated
+ * with it. If the source is already closed then invoking this
+ * method has no effect.
+ *
+ * @return future representing the close result.
+ */
+ Future<Void> asyncClose();
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java
new file mode 100644
index 0000000..aa3e94e
--- /dev/null
+++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/DistributedLogManagerImpl.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.apache.distributedlog.util.FutureUtils.newTFuture;
+
+import com.twitter.util.Future;
+import java.io.IOException;
+import java.util.List;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.subscription.SubscriptionsStore;
+
+/**
+ * The wrapper of {@link org.apache.distributedlog.api.DistributedLogManager}.
+ */
+public class DistributedLogManagerImpl implements DistributedLogManager {
+
+ private final org.apache.distributedlog.api.DistributedLogManager impl;
+
+ public DistributedLogManagerImpl(org.apache.distributedlog.api.DistributedLogManager impl) {
+ this.impl = impl;
+ }
+
+ @Override
+ public String getStreamName() {
+ return impl.getStreamName();
+ }
+
+ @Override
+ public NamespaceDriver getNamespaceDriver() {
+ return impl.getNamespaceDriver();
+ }
+
+ @Override
+ public List<LogSegmentMetadata> getLogSegments() throws IOException {
+ return impl.getLogSegments();
+ }
+
+ @Override
+ public void registerListener(LogSegmentListener listener) throws IOException {
+ impl.registerListener(listener);
+ }
+
+ @Override
+ public void unregisterListener(LogSegmentListener listener) {
+ impl.unregisterListener(listener);
+ }
+
+ @Override
+ public Future<AsyncLogWriter> openAsyncLogWriter() {
+ return newTFuture(impl.openAsyncLogWriter()).map(AsyncLogWriterImpl.MAP_FUNC);
+ }
+
+ @Override
+ public LogWriter startLogSegmentNonPartitioned() throws IOException {
+ return new LogWriterImpl(impl.startLogSegmentNonPartitioned());
+ }
+
+ @Override
+ public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException {
+ return new AsyncLogWriterImpl(impl.startAsyncLogSegmentNonPartitioned());
+ }
+
+ @Override
+ public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException {
+ return impl.getAppendOnlyStreamWriter();
+ }
+
+ @Override
+ public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException {
+ return impl.getAppendOnlyStreamReader();
+ }
+
+ @Override
+ public LogReader getInputStream(long fromTxnId) throws IOException {
+ return new LogReaderImpl(impl.getInputStream(fromTxnId));
+ }
+
+ @Override
+ public LogReader getInputStream(DLSN fromDLSN) throws IOException {
+ return new LogReaderImpl(impl.getInputStream(fromDLSN));
+ }
+
+ @Override
+ public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId) {
+ return newTFuture(impl.openAsyncLogReader(fromTxnId)).map(AsyncLogReaderImpl.MAP_FUNC);
+ }
+
+ @Override
+ public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN) {
+ return newTFuture(impl.openAsyncLogReader(fromDLSN)).map(AsyncLogReaderImpl.MAP_FUNC);
+ }
+
+ @Override
+ public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException {
+ return new AsyncLogReaderImpl(impl.getAsyncLogReader(fromTxnId));
+ }
+
+ @Override
+ public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException {
+ return new AsyncLogReaderImpl(impl.getAsyncLogReader(fromDLSN));
+ }
+
+ @Override
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN) {
+ return newTFuture(impl.getAsyncLogReaderWithLock(fromDLSN)).map(AsyncLogReaderImpl.MAP_FUNC);
+ }
+
+ @Override
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId) {
+ return newTFuture(impl.getAsyncLogReaderWithLock(fromDLSN, subscriberId))
+ .map(AsyncLogReaderImpl.MAP_FUNC);
+ }
+
+ @Override
+ public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId) {
+ return newTFuture(impl.getAsyncLogReaderWithLock(subscriberId))
+ .map(AsyncLogReaderImpl.MAP_FUNC);
+ }
+
+ @Override
+ public Future<DLSN> getDLSNNotLessThanTxId(long transactionId) {
+ return newTFuture(impl.getDLSNNotLessThanTxId(transactionId));
+ }
+
+ @Override
+ public LogRecordWithDLSN getLastLogRecord() throws IOException {
+ return impl.getLastLogRecord();
+ }
+
+ @Override
+ public long getFirstTxId() throws IOException {
+ return impl.getFirstTxId();
+ }
+
+ @Override
+ public long getLastTxId() throws IOException {
+ return impl.getLastTxId();
+ }
+
+ @Override
+ public DLSN getLastDLSN() throws IOException {
+ return impl.getLastDLSN();
+ }
+
+ @Override
+ public Future<LogRecordWithDLSN> getLastLogRecordAsync() {
+ return newTFuture(impl.getLastLogRecordAsync());
+ }
+
+ @Override
+ public Future<Long> getLastTxIdAsync() {
+ return newTFuture(impl.getLastTxIdAsync());
+ }
+
+ @Override
+ public Future<DLSN> getFirstDLSNAsync() {
+ return newTFuture(impl.getFirstDLSNAsync());
+ }
+
+ @Override
+ public Future<DLSN> getLastDLSNAsync() {
+ return newTFuture(impl.getLastDLSNAsync());
+ }
+
+ @Override
+ public long getLogRecordCount() throws IOException {
+ return impl.getLogRecordCount();
+ }
+
+ @Override
+ public Future<Long> getLogRecordCountAsync(DLSN beginDLSN) {
+ return newTFuture(impl.getLogRecordCountAsync(beginDLSN));
+ }
+
+ @Override
+ public void recover() throws IOException {
+ impl.recover();
+ }
+
+ @Override
+ public boolean isEndOfStreamMarked() throws IOException {
+ return impl.isEndOfStreamMarked();
+ }
+
+ @Override
+ public void delete() throws IOException {
+ impl.delete();
+ }
+
+ @Override
+ public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
+ impl.purgeLogsOlderThan(minTxIdToKeep);
+ }
+
+ @Override
+ public SubscriptionsStore getSubscriptionsStore() {
+ return new SubscriptionsStoreImpl(impl.getSubscriptionsStore());
+ }
+
+ @Override
+ public void close() throws IOException {
+ impl.close();
+ }
+
+ @Override
+ public Future<Void> asyncClose() {
+ return newTFuture(impl.asyncClose());
+ }
+}