You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:51 UTC
[19/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java
new file mode 100644
index 0000000..bed516d
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestDistributedLogManagerImpl.java
@@ -0,0 +1,351 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.callback.LogSegmentListener;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.util.FutureUtils;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DistributedLogManagerImpl}.
+ */
+public class TestDistributedLogManagerImpl {
+
+ private final org.apache.distributedlog.api.DistributedLogManager impl =
+ mock(org.apache.distributedlog.api.DistributedLogManager.class);
+ private final DistributedLogManagerImpl manager = new DistributedLogManagerImpl(impl);
+
+ @Test
+ public void testGetStreamName() throws Exception {
+ String name = "test-get-stream-name";
+ when(impl.getStreamName()).thenReturn(name);
+ assertEquals(name, manager.getStreamName());
+ verify(impl, times(1)).getStreamName();
+ }
+
+ @Test
+ public void testGetNamespaceDriver() throws Exception {
+ NamespaceDriver driver = mock(NamespaceDriver.class);
+ when(impl.getNamespaceDriver()).thenReturn(driver);
+ assertEquals(driver, manager.getNamespaceDriver());
+ verify(impl, times(1)).getNamespaceDriver();
+ }
+
+ @Test
+ public void testGetLogSegments() throws Exception {
+ List<LogSegmentMetadata> segments = mock(List.class);
+ when(impl.getLogSegments()).thenReturn(segments);
+ assertEquals(segments, manager.getLogSegments());
+ verify(impl, times(1)).getLogSegments();
+ }
+
+ @Test
+ public void testRegisterListener() throws Exception {
+ LogSegmentListener listener = mock(LogSegmentListener.class);
+ manager.registerListener(listener);
+ verify(impl, times(1)).registerListener(listener);
+ }
+
+ @Test
+ public void testUnregisterListener() throws Exception {
+ LogSegmentListener listener = mock(LogSegmentListener.class);
+ manager.unregisterListener(listener);
+ verify(impl, times(1)).unregisterListener(listener);
+ }
+
+ @Test
+ public void testOpenAsyncLogWriter() throws Exception {
+ AsyncLogWriter writer = mock(AsyncLogWriter.class);
+ when(impl.openAsyncLogWriter()).thenReturn(CompletableFuture.completedFuture(writer));
+ assertEquals(writer, ((AsyncLogWriterImpl) FutureUtils.result(manager.openAsyncLogWriter())).getImpl());
+ verify(impl, times(1)).openAsyncLogWriter();
+ }
+
+ @Test
+ public void testStartLogSegmentNonPartitioned() throws Exception {
+ LogWriter writer = mock(LogWriter.class);
+ when(impl.startLogSegmentNonPartitioned()).thenReturn(writer);
+ assertEquals(writer, ((LogWriterImpl) manager.startLogSegmentNonPartitioned()).getImpl());
+ verify(impl, times(1)).startLogSegmentNonPartitioned();
+ }
+
+ @Test
+ public void testStartAsyncLogSegmentNonPartitioned() throws Exception {
+ AsyncLogWriter writer = mock(AsyncLogWriter.class);
+ when(impl.startAsyncLogSegmentNonPartitioned()).thenReturn(writer);
+ assertEquals(writer, ((AsyncLogWriterImpl) manager.startAsyncLogSegmentNonPartitioned()).getImpl());
+ verify(impl, times(1)).startAsyncLogSegmentNonPartitioned();
+ }
+
+ @Test
+ public void testGetAppendOnlyStreamWriter() throws Exception {
+ AppendOnlyStreamWriter writer = mock(AppendOnlyStreamWriter.class);
+ when(impl.getAppendOnlyStreamWriter()).thenReturn(writer);
+ assertEquals(writer, manager.getAppendOnlyStreamWriter());
+ verify(impl, times(1)).getAppendOnlyStreamWriter();
+ }
+
+ @Test
+ public void testGetAppendOnlyStreamReader() throws Exception {
+ AppendOnlyStreamReader writer = mock(AppendOnlyStreamReader.class);
+ when(impl.getAppendOnlyStreamReader()).thenReturn(writer);
+ assertEquals(writer, manager.getAppendOnlyStreamReader());
+ verify(impl, times(1)).getAppendOnlyStreamReader();
+ }
+
+ @Test
+ public void testGetInputStream() throws Exception {
+ LogReader reader = mock(LogReader.class);
+ when(impl.getInputStream(anyLong())).thenReturn(reader);
+ assertEquals(reader, ((LogReaderImpl) manager.getInputStream(1234L)).getImpl());
+ verify(impl, times(1)).getInputStream(eq(1234L));
+ }
+
+ @Test
+ public void testGetInputStream2() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ LogReader reader = mock(LogReader.class);
+ when(impl.getInputStream(eq(dlsn))).thenReturn(reader);
+ assertEquals(reader, ((LogReaderImpl) manager.getInputStream(dlsn)).getImpl());
+ verify(impl, times(1)).getInputStream(eq(dlsn));
+ }
+
+ @Test
+ public void testOpenAsyncLogReader() throws Exception {
+ org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+ when(impl.openAsyncLogReader(eq(1234L))).thenReturn(CompletableFuture.completedFuture(reader));
+ assertEquals(reader,
+ ((AsyncLogReaderImpl) FutureUtils.result(manager.openAsyncLogReader(1234L))).getImpl());
+ verify(impl, times(1)).openAsyncLogReader(eq(1234L));
+ }
+
+ @Test
+ public void testOpenAsyncLogReader2() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+ when(impl.openAsyncLogReader(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(reader));
+ assertEquals(reader,
+ ((AsyncLogReaderImpl) FutureUtils.result(manager.openAsyncLogReader(dlsn))).getImpl());
+ verify(impl, times(1)).openAsyncLogReader(eq(dlsn));
+ }
+
+ @Test
+ public void testGetAsyncLogReader() throws Exception {
+ org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+ when(impl.getAsyncLogReader(eq(1234L))).thenReturn(reader);
+ assertEquals(reader,
+ ((AsyncLogReaderImpl) manager.getAsyncLogReader(1234L)).getImpl());
+ verify(impl, times(1)).getAsyncLogReader(eq(1234L));
+ }
+
+ @Test
+ public void testGetAsyncLogReader2() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+ when(impl.getAsyncLogReader(eq(dlsn))).thenReturn(reader);
+ assertEquals(reader,
+ ((AsyncLogReaderImpl) manager.getAsyncLogReader(dlsn)).getImpl());
+ verify(impl, times(1)).getAsyncLogReader(eq(dlsn));
+ }
+
+ @Test
+ public void testOpenAsyncLogReaderWithLock() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+ when(impl.getAsyncLogReaderWithLock(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(reader));
+ assertEquals(reader,
+ ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(dlsn))).getImpl());
+ verify(impl, times(1)).getAsyncLogReaderWithLock(eq(dlsn));
+ }
+
+ @Test
+ public void testOpenAsyncLogReaderWithLock2() throws Exception {
+ String subscriberId = "test-subscriber";
+ DLSN dlsn = mock(DLSN.class);
+ org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+ when(impl.getAsyncLogReaderWithLock(eq(dlsn), eq(subscriberId)))
+ .thenReturn(CompletableFuture.completedFuture(reader));
+ assertEquals(reader,
+ ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(dlsn, subscriberId))).getImpl());
+ verify(impl, times(1)).getAsyncLogReaderWithLock(eq(dlsn), eq(subscriberId));
+ }
+
+ @Test
+ public void testOpenAsyncLogReaderWithLock3() throws Exception {
+ String subscriberId = "test-subscriber";
+ org.apache.distributedlog.api.AsyncLogReader reader = mock(org.apache.distributedlog.api.AsyncLogReader.class);
+ when(impl.getAsyncLogReaderWithLock(eq(subscriberId)))
+ .thenReturn(CompletableFuture.completedFuture(reader));
+ assertEquals(reader,
+ ((AsyncLogReaderImpl) FutureUtils.result(manager.getAsyncLogReaderWithLock(subscriberId))).getImpl());
+ verify(impl, times(1)).getAsyncLogReaderWithLock(eq(subscriberId));
+ }
+
+ @Test
+ public void testGetDLSNNotLessThanTxId() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ when(impl.getDLSNNotLessThanTxId(anyLong())).thenReturn(CompletableFuture.completedFuture(dlsn));
+ assertEquals(dlsn, FutureUtils.result(manager.getDLSNNotLessThanTxId(1234L)));
+ verify(impl, times(1)).getDLSNNotLessThanTxId(eq(1234L));
+ }
+
+ @Test
+ public void testGetLastLogRecord() throws Exception {
+ LogRecordWithDLSN record = mock(LogRecordWithDLSN.class);
+ when(impl.getLastLogRecord()).thenReturn(record);
+ assertEquals(record, manager.getLastLogRecord());
+ verify(impl, times(1)).getLastLogRecord();
+ }
+
+ @Test
+ public void testFirstTxId() throws Exception {
+ long txId = System.currentTimeMillis();
+ when(impl.getFirstTxId()).thenReturn(txId);
+ assertEquals(txId, manager.getFirstTxId());
+ verify(impl, times(1)).getFirstTxId();
+ }
+
+ @Test
+ public void testLastTxId() throws Exception {
+ long txId = System.currentTimeMillis();
+ when(impl.getLastTxId()).thenReturn(txId);
+ assertEquals(txId, manager.getLastTxId());
+ verify(impl, times(1)).getLastTxId();
+ }
+
+ @Test
+ public void testLastDLSN() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ when(impl.getLastDLSN()).thenReturn(dlsn);
+ assertEquals(dlsn, manager.getLastDLSN());
+ verify(impl, times(1)).getLastDLSN();
+ }
+
+ @Test
+ public void testGetLastLogRecordAsync() throws Exception {
+ LogRecordWithDLSN record = mock(LogRecordWithDLSN.class);
+ when(impl.getLastLogRecordAsync()).thenReturn(CompletableFuture.completedFuture(record));
+ assertEquals(record, FutureUtils.result(manager.getLastLogRecordAsync()));
+ verify(impl, times(1)).getLastLogRecordAsync();
+ }
+
+ @Test
+ public void testLastTxIdAsync() throws Exception {
+ long txId = System.currentTimeMillis();
+ when(impl.getLastTxIdAsync()).thenReturn(CompletableFuture.completedFuture(txId));
+ assertEquals(txId, FutureUtils.result(manager.getLastTxIdAsync()).longValue());
+ verify(impl, times(1)).getLastTxIdAsync();
+ }
+
+ @Test
+ public void testLastDLSNAsync() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ when(impl.getLastDLSNAsync()).thenReturn(CompletableFuture.completedFuture(dlsn));
+ assertEquals(dlsn, FutureUtils.result(manager.getLastDLSNAsync()));
+ verify(impl, times(1)).getLastDLSNAsync();
+ }
+
+ @Test
+ public void testFirstDLSNAsync() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ when(impl.getFirstDLSNAsync()).thenReturn(CompletableFuture.completedFuture(dlsn));
+ assertEquals(dlsn, FutureUtils.result(manager.getFirstDLSNAsync()));
+ verify(impl, times(1)).getFirstDLSNAsync();
+ }
+
+ @Test
+ public void testGetLogRecordCount() throws Exception {
+ long count = System.currentTimeMillis();
+ when(impl.getLogRecordCount()).thenReturn(count);
+ assertEquals(count, manager.getLogRecordCount());
+ verify(impl, times(1)).getLogRecordCount();
+ }
+
+ @Test
+ public void testGetLogRecordCountAsync() throws Exception {
+ DLSN dlsn = mock(DLSN.class);
+ long count = System.currentTimeMillis();
+ when(impl.getLogRecordCountAsync(eq(dlsn))).thenReturn(CompletableFuture.completedFuture(count));
+ assertEquals(count, FutureUtils.result(manager.getLogRecordCountAsync(dlsn)).longValue());
+ verify(impl, times(1)).getLogRecordCountAsync(eq(dlsn));
+ }
+
+ @Test
+ public void testRecover() throws Exception {
+ manager.recover();
+ verify(impl, times(1)).recover();
+ }
+
+ @Test
+ public void testIsEndOfStreamMarked() throws Exception {
+ when(impl.isEndOfStreamMarked()).thenReturn(true);
+ assertTrue(manager.isEndOfStreamMarked());
+ verify(impl, times(1)).isEndOfStreamMarked();
+ }
+
+ @Test
+ public void testDelete() throws Exception {
+ manager.delete();
+ verify(impl, times(1)).delete();
+ }
+
+ @Test
+ public void testPurgeLogsOlderThan() throws Exception {
+ long minTxIdToKeep = System.currentTimeMillis();
+ manager.purgeLogsOlderThan(minTxIdToKeep);
+ verify(impl, times(1)).purgeLogsOlderThan(eq(minTxIdToKeep));
+ }
+
+ @Test
+ public void testGetSubscriptionsStore() throws Exception {
+ SubscriptionsStore ss = mock(SubscriptionsStore.class);
+ when(impl.getSubscriptionsStore()).thenReturn(ss);
+ assertEquals(ss, ((SubscriptionsStoreImpl) manager.getSubscriptionsStore()).getImpl());
+ verify(impl, times(1)).getSubscriptionsStore();
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ manager.close();
+ verify(impl, times(1)).close();
+ }
+
+ @Test
+ public void testAsyncClose() throws Exception {
+ when(impl.asyncClose()).thenReturn(CompletableFuture.completedFuture(null));
+ FutureUtils.result(manager.asyncClose());
+ verify(impl, times(1)).asyncClose();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java
new file mode 100644
index 0000000..4adc386
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogReaderImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.util.FutureUtils;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link LogReaderImpl}.
+ */
+public class TestLogReaderImpl {
+
+ private final org.apache.distributedlog.api.LogReader underlying =
+ mock(org.apache.distributedlog.api.LogReader.class);
+ private final LogReaderImpl reader = new LogReaderImpl(underlying);
+
+ @Test
+ public void testReadNext() throws Exception {
+ reader.readNext(false);
+ verify(underlying, times(1)).readNext(eq(false));
+ }
+
+ @Test
+ public void testReadBulk() throws Exception {
+ reader.readBulk(false, 100);
+ verify(underlying, times(1)).readBulk(eq(false), eq(100));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ reader.close();
+ verify(underlying, times(1)).close();
+ }
+
+ @Test
+ public void testAsyncClose() throws Exception {
+ when(underlying.asyncClose())
+ .thenReturn(CompletableFuture.completedFuture(null));
+ FutureUtils.result(reader.asyncClose());
+ verify(underlying, times(1)).asyncClose();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java
new file mode 100644
index 0000000..be69260
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestLogWriterImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.List;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link LogWriterImpl}.
+ */
+public class TestLogWriterImpl {
+
+ private final org.apache.distributedlog.api.LogWriter underlying =
+ mock(org.apache.distributedlog.api.LogWriter.class);
+ private final LogWriterImpl writer = new LogWriterImpl(underlying);
+
+ @Test
+ public void testWrite() throws Exception {
+ LogRecord record = mock(LogRecord.class);
+ writer.write(record);
+ verify(underlying, times(1)).write(eq(record));
+ }
+
+ @Test
+ public void testWriteBulk() throws Exception {
+ List<LogRecord> records = mock(List.class);
+ writer.writeBulk(records);
+ verify(underlying, times(1)).writeBulk(eq(records));
+ }
+
+ @Test
+ public void testSetReadyToFlush() throws Exception {
+ writer.setReadyToFlush();
+ verify(underlying, times(1)).setReadyToFlush();
+ }
+
+ @Test
+ public void testFlushAndSync() throws Exception {
+ writer.flushAndSync();
+ verify(underlying, times(1)).flushAndSync();
+ }
+
+ @Test
+ public void testMarkEndOfStream() throws Exception {
+ writer.markEndOfStream();
+ verify(underlying, times(1)).markEndOfStream();
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ writer.close();
+ verify(underlying, times(1)).close();
+ }
+
+ @Test
+ public void testAbort() throws Exception {
+ writer.abort();
+ verify(underlying, times(1)).abort();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java
new file mode 100644
index 0000000..e6573aa
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/TestSubscriptionStoreImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.util.FutureUtils;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link SubscriptionsStoreImpl}.
+ */
+public class TestSubscriptionStoreImpl {
+
+ private final SubscriptionsStore underlying = mock(SubscriptionsStore.class);
+ private final SubscriptionsStoreImpl store = new SubscriptionsStoreImpl(underlying);
+
+ @Test
+ public void testGetLastCommitPosition() throws Exception {
+ String subscriber = "test-subscriber";
+ DLSN dlsn = mock(DLSN.class);
+ when(underlying.getLastCommitPosition(anyString()))
+ .thenReturn(CompletableFuture.completedFuture(dlsn));
+ assertEquals(dlsn,
+ FutureUtils.result(store.getLastCommitPosition(subscriber)));
+ verify(underlying, times(1)).getLastCommitPosition(eq(subscriber));
+ }
+
+ @Test
+ public void testGetLastCommitPositions() throws Exception {
+ Map<String, DLSN> positions = mock(Map.class);
+ when(underlying.getLastCommitPositions())
+ .thenReturn(CompletableFuture.completedFuture(positions));
+ assertEquals(positions, FutureUtils.result(store.getLastCommitPositions()));
+ verify(underlying, times(1)).getLastCommitPositions();
+ }
+
+ @Test
+ public void testAdvanceCommmitPosition() throws Exception {
+ String subscriber = "test-subscriber";
+ DLSN dlsn = mock(DLSN.class);
+ when(underlying.advanceCommitPosition(anyString(), any(DLSN.class)))
+ .thenReturn(CompletableFuture.completedFuture(null));
+ FutureUtils.result(store.advanceCommitPosition(subscriber, dlsn));
+ verify(underlying, times(1))
+ .advanceCommitPosition(eq(subscriber), eq(dlsn));
+ }
+
+ @Test
+ public void testDeleteSubscriber() throws Exception {
+ String subscriber = "test-subscriber";
+ when(underlying.deleteSubscriber(anyString()))
+ .thenReturn(CompletableFuture.completedFuture(true));
+ assertTrue(FutureUtils.result(store.deleteSubscriber(subscriber)));
+ verify(underlying, times(1)).deleteSubscriber(eq(subscriber));
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ store.close();
+ verify(underlying, times(1)).close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
new file mode 100644
index 0000000..78dcb2a
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.namespace;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.net.URI;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DistributedLogNamespaceBuilder}.
+ */
+public class TestDistributedLogNamespaceBuilder {
+
+ private final NamespaceBuilder underlying = mock(NamespaceBuilder.class);
+ private final DistributedLogNamespaceBuilder builder = new DistributedLogNamespaceBuilder(underlying);
+
+ @Test
+ public void testConf() {
+ DistributedLogConfiguration conf = mock(DistributedLogConfiguration.class);
+ builder.conf(conf);
+ verify(underlying, times(1)).conf(eq(conf));
+ }
+
+ @Test
+ public void testDynConf() {
+ DynamicDistributedLogConfiguration conf = mock(DynamicDistributedLogConfiguration.class);
+ builder.dynConf(conf);
+ verify(underlying, times(1)).dynConf(eq(conf));
+ }
+
+ @Test
+ public void testUri() {
+ URI uri = URI.create("distributedlog://127.0.0.1/messaging/distributedlog");
+ builder.uri(uri);
+ verify(underlying, times(1)).uri(eq(uri));
+ }
+
+ @Test
+ public void testStatsLogger() {
+ StatsLogger statsLogger = mock(StatsLogger.class);
+ builder.statsLogger(statsLogger);
+ verify(underlying, times(1)).statsLogger(eq(statsLogger));
+ }
+
+ @Test
+ public void testPerLogStatsLogger() {
+ StatsLogger statsLogger = mock(StatsLogger.class);
+ builder.perLogStatsLogger(statsLogger);
+ verify(underlying, times(1)).perLogStatsLogger(eq(statsLogger));
+ }
+
+ @Test
+ public void testFeatureProvider() {
+ FeatureProvider provider = mock(FeatureProvider.class);
+ builder.featureProvider(provider);
+ verify(underlying, times(1)).featureProvider(eq(provider));
+ }
+
+ @Test
+ public void testClientId() {
+ String clientId = "test-client-id";
+ builder.clientId(clientId);
+ verify(underlying, times(1)).clientId(eq(clientId));
+ }
+
+ @Test
+ public void testRegionId() {
+ int regionId = 1234;
+ builder.regionId(regionId);
+ verify(underlying, times(1)).regionId(eq(regionId));
+ }
+
+ @Test
+ public void testBuild() throws Exception {
+ builder.build();
+ verify(underlying, times(1)).build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java
new file mode 100644
index 0000000..b562fe4
--- /dev/null
+++ b/distributedlog-core-twitter/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceImpl.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.distributedlog.namespace;
+
+import static junit.framework.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.Optional;
+import java.util.Iterator;
+import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.callback.NamespaceListener;
+import org.junit.Test;
+
+/**
+ * Unit test of {@link DistributedLogNamespaceImpl}.
+ */
+public class TestDistributedLogNamespaceImpl {
+
+ private final Namespace impl = mock(Namespace.class);
+ private final DistributedLogNamespaceImpl namespace = new DistributedLogNamespaceImpl(impl);
+
+ @Test
+ public void testGetNamespaceDriver() {
+ NamespaceDriver driver = mock(NamespaceDriver.class);
+ when(impl.getNamespaceDriver()).thenReturn(driver);
+ assertEquals(driver, namespace.getNamespaceDriver());
+ verify(impl, times(1)).getNamespaceDriver();
+ }
+
+ @Test
+ public void testCreateLog() throws Exception {
+ String logName = "test-log-name";
+ namespace.createLog(logName);
+ verify(impl, times(1)).createLog(eq(logName));
+ }
+
+ @Test
+ public void testDeleteLog() throws Exception {
+ String logName = "test-log-name";
+ namespace.deleteLog(logName);
+ verify(impl, times(1)).deleteLog(eq(logName));
+ }
+
+ @Test
+ public void testOpenLog() throws Exception {
+ String logName = "test-open-log";
+ namespace.openLog(logName);
+ verify(impl, times(1)).openLog(eq(logName));
+ }
+
+ @Test
+ public void testOpenLog2() throws Exception {
+ String logName = "test-open-log";
+ namespace.openLog(logName, Optional.absent(), Optional.absent(), Optional.absent());
+ verify(impl, times(1))
+ .openLog(eq(logName), eq(Optional.absent()), eq(Optional.absent()), eq(Optional.absent()));
+ }
+
+ @Test
+ public void testLogExists() throws Exception {
+ String logName = "test-log-exists";
+ when(impl.logExists(anyString())).thenReturn(true);
+ assertTrue(namespace.logExists(logName));
+ verify(impl, times(1)).logExists(eq(logName));
+ }
+
+ @Test
+ public void testGetLogs() throws Exception {
+ Iterator<String> logs = mock(Iterator.class);
+ when(impl.getLogs()).thenReturn(logs);
+ assertEquals(logs, namespace.getLogs());
+ verify(impl, times(1)).getLogs();
+ }
+
+ @Test
+ public void testRegisterNamespaceListener() throws Exception {
+ NamespaceListener listener = mock(NamespaceListener.class);
+ namespace.registerNamespaceListener(listener);
+ verify(impl, times(1)).registerNamespaceListener(eq(listener));
+ }
+
+ @Test
+ public void testCreateAccessControlManager() throws Exception {
+ AccessControlManager manager = mock(AccessControlManager.class);
+ when(impl.createAccessControlManager()).thenReturn(manager);
+ assertEquals(manager, namespace.createAccessControlManager());
+ verify(impl, times(1)).createAccessControlManager();
+ }
+
+ @Test
+ public void testClose() {
+ namespace.close();
+ verify(impl, times(1)).close();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-core/conf/log4j.properties b/distributedlog-core/conf/log4j.properties
index 930db8d..af1cf5f 100644
--- a/distributedlog-core/conf/log4j.properties
+++ b/distributedlog-core/conf/log4j.properties
@@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO
log4j.logger.org.apache.bookkeeper=INFO
# redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
log4j.appender.Executors=org.apache.log4j.RollingFileAppender
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-core/pom.xml b/distributedlog-core/pom.xml
index a4f7568..1ad51c2 100644
--- a/distributedlog-core/pom.xml
+++ b/distributedlog-core/pom.xml
@@ -26,6 +26,16 @@
<name>Apache DistributedLog :: Core Library</name>
<dependencies>
<dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-common</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-protocol</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
@@ -41,50 +51,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.jmock</groupId>
- <artifactId>jmock</artifactId>
- <version>${jmock.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>${slf4j.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
- </dependency>
- <dependency>
- <groupId>com.twitter.common</groupId>
- <artifactId>stats-util</artifactId>
- <version>${stats-util.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-jdk14</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>util-core_2.11</artifactId>
- <version>${finagle.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- <version>${commons-lang3.version}</version>
- </dependency>
- <dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.3</version>
@@ -114,19 +80,28 @@
</exclusions>
</dependency>
<dependency>
- <groupId>commons-cli</groupId>
- <artifactId>commons-cli</artifactId>
- <version>${commons-cli.version}</version>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
- <groupId>org.apache.distributedlog</groupId>
- <artifactId>distributedlog-protocol</artifactId>
- <version>${project.parent.version}</version>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>${junit.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>net.jpountz.lz4</groupId>
- <artifactId>lz4</artifactId>
- <version>${lz4.version}</version>
+ <groupId>org.jmock</groupId>
+ <artifactId>jmock</artifactId>
+ <version>${jmock.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
@@ -136,7 +111,7 @@
</dependency>
<dependency>
<groupId>org.apache.distributedlog</groupId>
- <artifactId>distributedlog-protocol</artifactId>
+ <artifactId>distributedlog-common</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
@@ -172,7 +147,7 @@
<properties>
<property>
<name>listener</name>
- <value>org.apache.distributedlog.TimedOutTestsListener</value>
+ <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value>
</property>
</properties>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
index 1d96f0e..3a31907 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamReader.java
@@ -18,10 +18,10 @@
package org.apache.distributedlog;
import com.google.common.base.Preconditions;
-
import java.io.IOException;
import java.io.InputStream;
-
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
index 8278c68..dde78c2 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AppendOnlyStreamWriter.java
@@ -17,14 +17,12 @@
*/
package org.apache.distributedlog;
-import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
import java.io.Closeable;
import java.io.IOException;
-
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.exceptions.UnexpectedException;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,16 +42,16 @@ public class AppendOnlyStreamWriter implements Closeable {
this.requestPos = pos;
}
- public Future<DLSN> write(byte[] data) {
+ public CompletableFuture<DLSN> write(byte[] data) {
requestPos += data.length;
- Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
- return writeResult.addEventListener(new WriteCompleteListener(requestPos));
+ CompletableFuture<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
+ return writeResult.whenComplete(new WriteCompleteListener(requestPos));
}
public void force(boolean metadata) throws IOException {
long pos = 0;
try {
- pos = Await.result(logWriter.flushAndCommit());
+ pos = FutureUtils.result(logWriter.flushAndCommit());
} catch (IOException ioe) {
throw ioe;
} catch (Exception ex) {
@@ -78,7 +76,7 @@ public class AppendOnlyStreamWriter implements Closeable {
public void markEndOfStream() throws IOException {
try {
- Await.result(logWriter.markEndOfStream());
+ FutureUtils.result(logWriter.markEndOfStream());
} catch (IOException ioe) {
throw ioe;
} catch (Exception ex) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
deleted file mode 100644
index e3ace05..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogReader.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-public interface AsyncLogReader extends AsyncCloseable {
-
- /**
- * Get stream name that the reader reads from.
- *
- * @return stream name.
- */
- public String getStreamName();
-
- /**
- * Read the next record from the log stream
- *
- * @return A promise that when satisfied will contain the Log Record with its DLSN.
- */
- public Future<LogRecordWithDLSN> readNext();
-
- /**
- * Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
- * of entries. It doesn't block until returning exact <i>numEntries</i>. It is a best effort
- * call.
- *
- * @param numEntries
- * num entries
- * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
- */
- public Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
-
- /**
- * Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
- * <p>
- * The future is satisfied when either reads <i>numEntries</i> entries or reaches <i>waitTime</i>.
- * The only exception is if there isn't any new entries written within <i>waitTime</i>, it would
- * wait until new entries are available.
- *
- * @param numEntries
- * max entries to return
- * @param waitTime
- * maximum wait time if there are entries already for read
- * @param timeUnit
- * wait time unit
- * @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
- */
- public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
deleted file mode 100644
index 53b393b..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncLogWriter.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.io.AsyncAbortable;
-import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
-
-import java.io.Closeable;
-import java.util.List;
-
-public interface AsyncLogWriter extends AsyncCloseable, AsyncAbortable {
-
- /**
- * Get the last committed transaction id.
- *
- * @return last committed transaction id.
- */
- public long getLastTxId();
-
- /**
- * Write a log record to the stream.
- *
- * @param record single log record
- * @return A Future which contains a DLSN if the record was successfully written
- * or an exception if the write fails
- */
- public Future<DLSN> write(LogRecord record);
-
- /**
- * Write log records to the stream in bulk. Each future in the list represents the result of
- * one write operation. The size of the result list is equal to the size of the input list.
- * Buffers are written in order, and the list of result futures has the same order.
- *
- * @param record set of log records
- * @return A Future which contains a list of Future DLSNs if the record was successfully written
- * or an exception if the operation fails.
- */
- public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> record);
-
- /**
- * Truncate the log until <i>dlsn</i>.
- *
- * @param dlsn
- * dlsn to truncate until.
- * @return A Future indicates whether the operation succeeds or not, or an exception
- * if the truncation fails.
- */
- public Future<Boolean> truncate(DLSN dlsn);
-
- /**
- * Get the name of the stream this writer writes data to
- */
- public String getStreamName();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
index c12bd10..367bb50 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/AsyncNotification.java
@@ -17,7 +17,7 @@
*/
package org.apache.distributedlog;
-public interface AsyncNotification {
+interface AsyncNotification {
/**
* Triggered when the background activity encounters an exception
*
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
index 4a2ef30..8a0bffb 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAbstractLogWriter.java
@@ -18,6 +18,9 @@
package org.apache.distributedlog;
import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.LockingException;
@@ -27,18 +30,12 @@ import org.apache.distributedlog.io.Abortable;
import org.apache.distributedlog.io.Abortables;
import org.apache.distributedlog.io.AsyncAbortable;
import org.apache.distributedlog.io.AsyncCloseable;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
import java.io.Closeable;
import java.io.IOException;
@@ -53,18 +50,18 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
protected final BKDistributedLogManager bkDistributedLogManager;
// States
- private Promise<Void> closePromise = null;
+ private CompletableFuture<Void> closePromise = null;
private volatile boolean forceRolling = false;
private boolean forceRecovery = false;
// Truncation Related
- private Future<List<LogSegmentMetadata>> lastTruncationAttempt = null;
+ private CompletableFuture<List<LogSegmentMetadata>> lastTruncationAttempt = null;
@VisibleForTesting
private Long minTimestampToKeepOverride = null;
// Log Segment Writers
protected BKLogSegmentWriter segmentWriter = null;
- protected Future<BKLogSegmentWriter> segmentWriterFuture = null;
+ protected CompletableFuture<BKLogSegmentWriter> segmentWriterFuture = null;
protected BKLogSegmentWriter allocatedSegmentWriter = null;
protected BKLogWriteHandler writeHandler = null;
@@ -100,7 +97,7 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
// This code path will be executed when the handler is not set or has been closed
// due to forceRecovery during testing
BKLogWriteHandler newHandler =
- FutureUtils.result(bkDistributedLogManager.asyncCreateWriteHandler(false));
+ Utils.ioResult(bkDistributedLogManager.asyncCreateWriteHandler(false));
boolean success = false;
try {
synchronized (this) {
@@ -123,13 +120,13 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
return segmentWriter;
}
- protected synchronized Future<BKLogSegmentWriter> getCachedLogWriterFuture() {
+ protected synchronized CompletableFuture<BKLogSegmentWriter> getCachedLogWriterFuture() {
return segmentWriterFuture;
}
protected synchronized void cacheLogWriter(BKLogSegmentWriter logWriter) {
this.segmentWriter = logWriter;
- this.segmentWriterFuture = Future.value(logWriter);
+ this.segmentWriterFuture = FutureUtils.value(logWriter);
}
protected synchronized BKLogSegmentWriter removeCachedLogWriter() {
@@ -157,12 +154,12 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
}
}
- private Future<Void> asyncCloseAndComplete(boolean shouldThrow) {
+ private CompletableFuture<Void> asyncCloseAndComplete(boolean shouldThrow) {
BKLogSegmentWriter segmentWriter = getCachedLogWriter();
BKLogWriteHandler writeHandler = getCachedWriteHandler();
if (null != segmentWriter && null != writeHandler) {
cancelTruncation();
- Promise<Void> completePromise = new Promise<Void>();
+ CompletableFuture<Void> completePromise = new CompletableFuture<Void>();
asyncCloseAndComplete(segmentWriter, writeHandler, completePromise, shouldThrow);
return completePromise;
} else {
@@ -172,10 +169,10 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
private void asyncCloseAndComplete(final BKLogSegmentWriter segmentWriter,
final BKLogWriteHandler writeHandler,
- final Promise<Void> completePromise,
+ final CompletableFuture<Void> completePromise,
final boolean shouldThrow) {
writeHandler.completeAndCloseLogSegment(segmentWriter)
- .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+ .whenComplete(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onSuccess(LogSegmentMetadata segment) {
removeCachedLogWriter();
@@ -189,15 +186,11 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
}
private void complete(final Throwable cause) {
- closeNoThrow().ensure(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- if (null != cause && shouldThrow) {
- FutureUtils.setException(completePromise, cause);
- } else {
- FutureUtils.setValue(completePromise, null);
- }
- return BoxedUnit.UNIT;
+ FutureUtils.ensure(closeNoThrow(), () -> {
+ if (null != cause && shouldThrow) {
+ FutureUtils.completeExceptionally(completePromise, cause);
+ } else {
+ FutureUtils.complete(completePromise, null);
}
});
}
@@ -206,63 +199,67 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
@VisibleForTesting
void closeAndComplete() throws IOException {
- FutureUtils.result(asyncCloseAndComplete(true));
+ Utils.ioResult(asyncCloseAndComplete(true));
}
- protected Future<Void> asyncCloseAndComplete() {
+ protected CompletableFuture<Void> asyncCloseAndComplete() {
return asyncCloseAndComplete(true);
}
@Override
public void close() throws IOException {
- FutureUtils.result(asyncClose());
+ Utils.ioResult(asyncClose());
}
@Override
- public Future<Void> asyncClose() {
+ public CompletableFuture<Void> asyncClose() {
return asyncCloseAndComplete(false);
}
/**
* Close the writer and release all the underlying resources
*/
- protected Future<Void> closeNoThrow() {
- Promise<Void> closeFuture;
+ protected CompletableFuture<Void> closeNoThrow() {
+ CompletableFuture<Void> closeFuture;
synchronized (this) {
if (null != closePromise) {
return closePromise;
}
- closeFuture = closePromise = new Promise<Void>();
+ closeFuture = closePromise = new CompletableFuture<Void>();
}
cancelTruncation();
- Utils.closeSequence(bkDistributedLogManager.getScheduler(),
- true, /** ignore close errors **/
- getCachedLogWriter(),
- getAllocatedLogWriter(),
- getCachedWriteHandler()
- ).proxyTo(closeFuture);
+ FutureUtils.proxyTo(
+ Utils.closeSequence(bkDistributedLogManager.getScheduler(),
+ true, /** ignore close errors **/
+ getCachedLogWriter(),
+ getAllocatedLogWriter(),
+ getCachedWriteHandler()
+ ),
+ closeFuture);
return closeFuture;
}
@Override
public void abort() throws IOException {
- FutureUtils.result(asyncAbort());
+ Utils.ioResult(asyncAbort());
}
@Override
- public Future<Void> asyncAbort() {
- Promise<Void> closeFuture;
+ public CompletableFuture<Void> asyncAbort() {
+ CompletableFuture<Void> closeFuture;
synchronized (this) {
if (null != closePromise) {
return closePromise;
}
- closeFuture = closePromise = new Promise<Void>();
+ closeFuture = closePromise = new CompletableFuture<Void>();
}
cancelTruncation();
- Abortables.abortSequence(bkDistributedLogManager.getScheduler(),
- getCachedLogWriter(),
- getAllocatedLogWriter(),
- getCachedWriteHandler()).proxyTo(closeFuture);
+ FutureUtils.proxyTo(
+ Abortables.abortSequence(bkDistributedLogManager.getScheduler(),
+ getCachedLogWriter(),
+ getAllocatedLogWriter(),
+ getCachedWriteHandler()),
+ closeFuture);
return closeFuture;
}
@@ -270,22 +267,22 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
protected BKLogSegmentWriter getLedgerWriter(final long startTxId,
final boolean allowMaxTxID)
throws IOException {
- Future<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true);
+ CompletableFuture<BKLogSegmentWriter> logSegmentWriterFuture = asyncGetLedgerWriter(true);
BKLogSegmentWriter logSegmentWriter = null;
if (null != logSegmentWriterFuture) {
- logSegmentWriter = FutureUtils.result(logSegmentWriterFuture);
+ logSegmentWriter = Utils.ioResult(logSegmentWriterFuture);
}
if (null == logSegmentWriter || (shouldStartNewSegment(logSegmentWriter) || forceRolling)) {
- logSegmentWriter = FutureUtils.result(rollLogSegmentIfNecessary(
+ logSegmentWriter = Utils.ioResult(rollLogSegmentIfNecessary(
logSegmentWriter, startTxId, true /* bestEffort */, allowMaxTxID));
}
return logSegmentWriter;
}
// used by async writer
- synchronized protected Future<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) {
+ synchronized protected CompletableFuture<BKLogSegmentWriter> asyncGetLedgerWriter(boolean resetOnError) {
final BKLogSegmentWriter ledgerWriter = getCachedLogWriter();
- Future<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture();
+ CompletableFuture<BKLogSegmentWriter> ledgerWriterFuture = getCachedLogWriterFuture();
if (null == ledgerWriterFuture || null == ledgerWriter) {
return null;
}
@@ -293,38 +290,38 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
// Handle the case where the last call to write actually caused an error in the log
if ((ledgerWriter.isLogSegmentInError() || forceRecovery) && resetOnError) {
// Close the ledger writer so that we will recover and start a new log segment
- Future<Void> closeFuture;
+ CompletableFuture<Void> closeFuture;
if (ledgerWriter.isLogSegmentInError()) {
closeFuture = ledgerWriter.asyncAbort();
} else {
closeFuture = ledgerWriter.asyncClose();
}
- return closeFuture.flatMap(
- new AbstractFunction1<Void, Future<BKLogSegmentWriter>>() {
+ return closeFuture.thenCompose(
+ new Function<Void, CompletionStage<BKLogSegmentWriter>>() {
@Override
- public Future<BKLogSegmentWriter> apply(Void result) {
+ public CompletableFuture<BKLogSegmentWriter> apply(Void result) {
removeCachedLogWriter();
if (ledgerWriter.isLogSegmentInError()) {
- return Future.value(null);
+ return FutureUtils.value(null);
}
BKLogWriteHandler writeHandler;
try {
writeHandler = getWriteHandler();
} catch (IOException e) {
- return Future.exception(e);
+ return FutureUtils.exception(e);
}
if (null != writeHandler && forceRecovery) {
return writeHandler.completeAndCloseLogSegment(ledgerWriter)
- .map(new AbstractFunction1<LogSegmentMetadata, BKLogSegmentWriter>() {
+ .thenApply(new Function<LogSegmentMetadata, BKLogSegmentWriter>() {
@Override
public BKLogSegmentWriter apply(LogSegmentMetadata completedLogSegment) {
return null;
}
});
} else {
- return Future.value(null);
+ return FutureUtils.value(null);
}
}
});
@@ -357,32 +354,25 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
// skip scheduling if there is task that's already running
//
synchronized (this) {
- if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDefined())) {
+ if (truncationEnabled && ((lastTruncationAttempt == null) || lastTruncationAttempt.isDone())) {
lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
}
}
}
- private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
+ private CompletableFuture<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
final long startTxId,
final boolean allowMaxTxID) {
return writeHandler.recoverIncompleteLogSegments()
- .flatMap(new AbstractFunction1<Long, Future<BKLogSegmentWriter>>() {
- @Override
- public Future<BKLogSegmentWriter> apply(Long lastTxId) {
- return writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID)
- .onSuccess(new AbstractFunction1<BKLogSegmentWriter, BoxedUnit>() {
- @Override
- public BoxedUnit apply(BKLogSegmentWriter newSegmentWriter) {
+ .thenCompose(
+ lastTxId -> writeHandler.asyncStartLogSegment(startTxId, false, allowMaxTxID)
+ .thenApply(newSegmentWriter -> {
cacheLogWriter(newSegmentWriter);
- return BoxedUnit.UNIT;
- }
- });
- }
- });
+ return newSegmentWriter;
+ }));
}
- private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(
+ private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOneWithPermit(
final BKLogSegmentWriter oldSegmentWriter,
final BKLogWriteHandler writeHandler,
final long startTxId,
@@ -390,47 +380,46 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
final boolean allowMaxTxID) {
final PermitManager.Permit switchPermit = bkDistributedLogManager.getLogSegmentRollingPermitManager().acquirePermit();
if (switchPermit.isAllowed()) {
- return closeOldLogSegmentAndStartNewOne(
- oldSegmentWriter,
- writeHandler,
- startTxId,
- bestEffort,
- allowMaxTxID
- ).rescue(new Function<Throwable, Future<BKLogSegmentWriter>>() {
- @Override
- public Future<BKLogSegmentWriter> apply(Throwable cause) {
- if (cause instanceof LockingException) {
- LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ",
- writeHandler.getFullyQualifiedName(), cause);
- bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
- return Future.value(oldSegmentWriter);
- } else if (cause instanceof ZKException) {
- ZKException zke = (ZKException) cause;
- if (ZKException.isRetryableZKException(zke)) {
- LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." +
- " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(),
- zke.getKeeperExceptionCode());
+ return FutureUtils.ensure(
+ FutureUtils.rescue(
+ closeOldLogSegmentAndStartNewOne(
+ oldSegmentWriter,
+ writeHandler,
+ startTxId,
+ bestEffort,
+ allowMaxTxID
+ ),
+ // rescue function
+ cause -> {
+ if (cause instanceof LockingException) {
+ LOG.warn("We lost lock during completeAndClose log segment for {}. Disable ledger rolling until it is recovered : ",
+ writeHandler.getFullyQualifiedName(), cause);
bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
- return Future.value(oldSegmentWriter);
+ return FutureUtils.value(oldSegmentWriter);
+ } else if (cause instanceof ZKException) {
+ ZKException zke = (ZKException) cause;
+ if (ZKException.isRetryableZKException(zke)) {
+ LOG.warn("Encountered zookeeper connection issues during completeAndClose log segment for {}." +
+ " Disable ledger rolling until it is recovered : {}", writeHandler.getFullyQualifiedName(),
+ zke.getKeeperExceptionCode());
+ bkDistributedLogManager.getLogSegmentRollingPermitManager().disallowObtainPermits(switchPermit);
+ return FutureUtils.value(oldSegmentWriter);
+ }
}
+ return FutureUtils.exception(cause);
}
- return Future.exception(cause);
- }
- }).ensure(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- bkDistributedLogManager.getLogSegmentRollingPermitManager()
- .releasePermit(switchPermit);
- return BoxedUnit.UNIT;
- }
- });
+ ),
+ // ensure function
+ () -> bkDistributedLogManager.getLogSegmentRollingPermitManager()
+ .releasePermit(switchPermit)
+ );
} else {
bkDistributedLogManager.getLogSegmentRollingPermitManager().releasePermit(switchPermit);
- return Future.value(oldSegmentWriter);
+ return FutureUtils.value(oldSegmentWriter);
}
}
- private Future<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(
+ private CompletableFuture<BKLogSegmentWriter> closeOldLogSegmentAndStartNewOne(
final BKLogSegmentWriter oldSegmentWriter,
final BKLogWriteHandler writeHandler,
final long startTxId,
@@ -444,14 +433,14 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
writeHandler.getFullyQualifiedName());
}
return writeHandler.asyncStartLogSegment(startTxId, bestEffort, allowMaxTxID)
- .flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() {
+ .thenCompose(new Function<BKLogSegmentWriter, CompletableFuture<BKLogSegmentWriter>>() {
@Override
- public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) {
+ public CompletableFuture<BKLogSegmentWriter> apply(BKLogSegmentWriter newSegmentWriter) {
if (null == newSegmentWriter) {
if (bestEffort) {
- return Future.value(oldSegmentWriter);
+ return FutureUtils.value(oldSegmentWriter);
} else {
- return Future.exception(
+ return FutureUtils.exception(
new UnexpectedException("StartLogSegment returns null for bestEffort rolling"));
}
}
@@ -468,30 +457,30 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
}
}
- private Future<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(
+ private CompletableFuture<BKLogSegmentWriter> completeOldSegmentAndCacheNewLogSegmentWriter(
BKLogSegmentWriter oldSegmentWriter,
final BKLogSegmentWriter newSegmentWriter) {
- final Promise<BKLogSegmentWriter> completePromise = new Promise<BKLogSegmentWriter>();
+ final CompletableFuture<BKLogSegmentWriter> completePromise = new CompletableFuture<BKLogSegmentWriter>();
// complete the old log segment
writeHandler.completeAndCloseLogSegment(oldSegmentWriter)
- .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+ .whenComplete(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onSuccess(LogSegmentMetadata value) {
cacheLogWriter(newSegmentWriter);
removeAllocatedLogWriter();
- FutureUtils.setValue(completePromise, newSegmentWriter);
+ FutureUtils.complete(completePromise, newSegmentWriter);
}
@Override
public void onFailure(Throwable cause) {
- FutureUtils.setException(completePromise, cause);
+ FutureUtils.completeExceptionally(completePromise, cause);
}
});
return completePromise;
}
- synchronized protected Future<BKLogSegmentWriter> rollLogSegmentIfNecessary(
+ synchronized protected CompletableFuture<BKLogSegmentWriter> rollLogSegmentIfNecessary(
final BKLogSegmentWriter segmentWriter,
long startTxId,
boolean bestEffort,
@@ -500,18 +489,18 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
try {
writeHandler = getWriteHandler();
} catch (IOException e) {
- return Future.exception(e);
+ return FutureUtils.exception(e);
}
- Future<BKLogSegmentWriter> rollPromise;
+ CompletableFuture<BKLogSegmentWriter> rollPromise;
if (null != segmentWriter && (writeHandler.shouldStartNewSegment(segmentWriter) || forceRolling)) {
rollPromise = closeOldLogSegmentAndStartNewOneWithPermit(
segmentWriter, writeHandler, startTxId, bestEffort, allowMaxTxID);
} else if (null == segmentWriter) {
rollPromise = asyncStartNewLogSegment(writeHandler, startTxId, allowMaxTxID);
} else {
- rollPromise = Future.value(segmentWriter);
+ rollPromise = FutureUtils.value(segmentWriter);
}
- return rollPromise.map(new AbstractFunction1<BKLogSegmentWriter, BKLogSegmentWriter>() {
+ return rollPromise.thenApply(new Function<BKLogSegmentWriter, BKLogSegmentWriter>() {
@Override
public BKLogSegmentWriter apply(BKLogSegmentWriter newSegmentWriter) {
if (segmentWriter == newSegmentWriter) {
@@ -542,7 +531,7 @@ abstract class BKAbstractLogWriter implements Closeable, AsyncCloseable, Abortab
protected synchronized void cancelTruncation() {
if (null != lastTruncationAttempt) {
- FutureUtils.cancel(lastTruncationAttempt);
+ lastTruncationAttempt.cancel(true);
lastTruncationAttempt = null;
}
}