You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:38 UTC
[06/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
index ccbfc44..4ad0bc0 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogSegmentWriter.java
@@ -17,24 +17,23 @@
*/
package org.apache.distributedlog;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
-import org.apache.distributedlog.io.Abortables;
import org.apache.distributedlog.lock.SessionLockFactory;
import org.apache.distributedlog.lock.ZKDistributedLock;
import org.apache.distributedlog.lock.ZKSessionLockFactory;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -42,14 +41,12 @@ import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import scala.runtime.AbstractFunction0;
import java.io.IOException;
import java.net.URI;
@@ -129,9 +126,9 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
boolean acquireLock)
throws Exception {
try {
- Await.result(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0],
+ Utils.ioResult(Utils.zkAsyncCreateFullPathOptimistic(zkClient, path, new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT));
- } catch (KeeperException.NodeExistsException nee) {
+ } catch (ZKException zke) {
// node already exists
}
SessionLockFactory lockFactory = new ZKSessionLockFactory(
@@ -150,7 +147,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
Long.MAX_VALUE,
NullStatsLogger.INSTANCE);
if (acquireLock) {
- return FutureUtils.result(lock.asyncAcquire());
+ return Utils.ioResult(lock.asyncAcquire());
} else {
return lock;
}
@@ -158,9 +155,9 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
private void closeWriterAndLock(BKLogSegmentWriter writer,
ZKDistributedLock lock)
- throws IOException {
+ throws Exception {
try {
- FutureUtils.result(writer.asyncClose());
+ Utils.ioResult(writer.asyncClose());
} finally {
Utils.closeQuietly(lock);
}
@@ -170,7 +167,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
ZKDistributedLock lock)
throws IOException {
try {
- Abortables.abort(writer, false);
+ Utils.abort(writer, false);
} finally {
Utils.closeQuietly(lock);
}
@@ -231,10 +228,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
createLogSegmentWriter(confLocal, 0L, -1L, lock);
// Use another lock to wait for writer releasing lock
ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
- Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+ CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
// add 10 records
int numRecords = 10;
- List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+ List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
for (int i = 0; i < numRecords; i++) {
futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
}
@@ -248,7 +245,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
10, writer.getPositionWithinLogSegment());
// close the writer should flush buffered data and release lock
closeWriterAndLock(writer, lock);
- Await.result(lockFuture0);
+ Utils.ioResult(lockFuture0);
lock0.checkOwnership();
assertEquals("Last tx id should still be " + (numRecords - 1),
numRecords - 1, writer.getLastTxId());
@@ -256,7 +253,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
numRecords - 1, writer.getLastTxIdAcknowledged());
assertEquals("Position should still be " + numRecords,
10, writer.getPositionWithinLogSegment());
- List<DLSN> dlsns = Await.result(Future.collect(futureList));
+ List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList));
assertEquals("All records should be written",
numRecords, dlsns.size());
for (int i = 0; i < numRecords; i++) {
@@ -293,10 +290,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
createLogSegmentWriter(confLocal, 0L, -1L, lock);
// Use another lock to wait for writer releasing lock
ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
- Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+ CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
// add 10 records
int numRecords = 10;
- List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+ List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
for (int i = 0; i < numRecords; i++) {
futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
}
@@ -310,7 +307,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
10, writer.getPositionWithinLogSegment());
// close the writer should flush buffered data and release lock
abortWriterAndLock(writer, lock);
- Await.result(lockFuture0);
+ Utils.ioResult(lockFuture0);
lock0.checkOwnership();
assertEquals("Last tx id should still be " + (numRecords - 1),
numRecords - 1, writer.getLastTxId());
@@ -323,7 +320,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
for (int i = 0; i < numRecords; i++) {
try {
- Await.result(futureList.get(i));
+ Utils.ioResult(futureList.get(i));
fail("Should be aborted record " + i + " with transmit exception");
} catch (WriteCancelledException wce) {
assertTrue("Record " + i + " should be aborted because of ledger fenced",
@@ -369,10 +366,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
createLogSegmentWriter(confLocal, 0L, -1L, lock);
// Use another lock to wait for writer releasing lock
ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
- Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+ CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
// add 10 records
int numRecords = 10;
- List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+ List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
for (int i = 0; i < numRecords; i++) {
futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
}
@@ -393,7 +390,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
assertEquals("Inconsistent rc is thrown",
rcToFailComplete, bkte.getBKResultCode());
}
- Await.result(lockFuture0);
+ Utils.ioResult(lockFuture0);
lock0.checkOwnership();
assertEquals("Last tx id should still be " + (numRecords - 1),
numRecords - 1, writer.getLastTxId());
@@ -406,7 +403,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
for (int i = 0; i < numRecords; i++) {
try {
- Await.result(futureList.get(i));
+ Utils.ioResult(futureList.get(i));
fail("Should be aborted record " + i + " with transmit exception");
} catch (WriteCancelledException wce) {
assertTrue("Record " + i + " should be aborted because of ledger fenced",
@@ -441,10 +438,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
createLogSegmentWriter(confLocal, 0L, -1L, lock);
// Use another lock to wait for writer releasing lock
ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
- Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+ CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
// add 10 records
int numRecords = 10;
- List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+ List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
for (int i = 0; i < numRecords; i++) {
futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
}
@@ -467,7 +464,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
BKException.Code.LedgerFencedException, bkte.getBKResultCode());
}
- Await.result(lockFuture0);
+ Utils.ioResult(lockFuture0);
lock0.checkOwnership();
assertEquals("Last tx id should still be " + (numRecords - 1),
@@ -481,7 +478,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
for (int i = 0; i < numRecords; i++) {
try {
- Await.result(futureList.get(i));
+ Utils.ioResult(futureList.get(i));
fail("Should be aborted record " + i + " with transmit exception");
} catch (BKTransmitException bkte) {
assertEquals("Record " + i + " should be aborted",
@@ -513,10 +510,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
createLogSegmentWriter(confLocal, 0L, -1L, lock);
// Use another lock to wait for writer releasing lock
ZKDistributedLock lock0 = createLock("/test/lock-" + runtime.getMethodName(), zkc0, false);
- Future<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
+ CompletableFuture<ZKDistributedLock> lockFuture0 = lock0.asyncAcquire();
// add 10 records
int numRecords = 10;
- List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+ List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
for (int i = 0; i < numRecords; i++) {
futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
}
@@ -530,23 +527,19 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
numRecords, writer.getPositionWithinLogSegment());
final CountDownLatch deferLatch = new CountDownLatch(1);
- writer.getFuturePool().apply(new AbstractFunction0<Object>() {
- @Override
- public Object apply() {
- try {
- deferLatch.await();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted on deferring completion : ", e);
- }
- return null;
+ writer.getFuturePool().submit(() -> {
+ try {
+ deferLatch.await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted on deferring completion : ", e);
}
});
// transmit the buffered data
- FutureUtils.result(writer.flush());
+ Utils.ioResult(writer.flush());
// add another 10 records
- List<Future<DLSN>> anotherFutureList = new ArrayList<Future<DLSN>>(numRecords);
+ List<CompletableFuture<DLSN>> anotherFutureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
for (int i = numRecords; i < 2 * numRecords; i++) {
anotherFutureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
}
@@ -562,13 +555,13 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
// abort the writer: it waits for outstanding transmits and abort buffered data
abortWriterAndLock(writer, lock);
- Await.result(lockFuture0);
+ Utils.ioResult(lockFuture0);
lock0.checkOwnership();
// release defer latch so completion would go through
deferLatch.countDown();
- List<DLSN> dlsns = Await.result(Future.collect(futureList));
+ List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList));
assertEquals("All first 10 records should be written",
numRecords, dlsns.size());
for (int i = 0; i < numRecords; i++) {
@@ -582,7 +575,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
}
for (int i = 0; i < numRecords; i++) {
try {
- Await.result(anotherFutureList.get(i));
+ Utils.ioResult(anotherFutureList.get(i));
fail("Should be aborted record " + (numRecords + i) + " with transmit exception");
} catch (WriteCancelledException wce) {
// writes should be cancelled.
@@ -622,7 +615,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
createLogSegmentWriter(confLocal, 0L, -1L, lock);
// add 10 records
int numRecords = 10;
- List<Future<DLSN>> futureList = new ArrayList<Future<DLSN>>(numRecords);
+ List<CompletableFuture<DLSN>> futureList = new ArrayList<CompletableFuture<DLSN>>(numRecords);
for (int i = 0; i < numRecords; i++) {
futureList.add(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(i)));
}
@@ -639,7 +632,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
// close the writer to flush the output buffer
closeWriterAndLock(writer, lock);
- List<DLSN> dlsns = Await.result(Future.collect(futureList));
+ List<DLSN> dlsns = Utils.ioResult(FutureUtils.collect(futureList));
assertEquals("All 11 records should be written",
numRecords + 1, dlsns.size());
for (int i = 0; i < numRecords; i++) {
@@ -687,10 +680,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
// close the writer
closeWriterAndLock(writer, lock);
- FutureUtils.result(writer.asyncClose());
+ Utils.ioResult(writer.asyncClose());
try {
- Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1)));
+ Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1)));
fail("Should fail the write if the writer is closed");
} catch (WriteException we) {
// expected
@@ -713,10 +706,10 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
BKLogSegmentWriter writer =
createLogSegmentWriter(confLocal, 0L, -1L, lock);
- FutureUtils.result(writer.markEndOfStream());
+ Utils.ioResult(writer.markEndOfStream());
try {
- Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1)));
+ Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(1)));
fail("Should fail the write if the writer is marked as end of stream");
} catch (EndOfStreamException we) {
// expected
@@ -747,7 +740,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
LogRecord record = DLMTestUtil.getLogRecordInstance(1);
record.setControl();
try {
- Await.result(writer.asyncWrite(record));
+ Utils.ioResult(writer.asyncWrite(record));
fail("Should fail the writer if the log segment is already fenced");
} catch (BKTransmitException bkte) {
// expected
@@ -755,7 +748,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
}
try {
- Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2)));
+ Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2)));
fail("Should fail the writer if the log segment is already fenced");
} catch (WriteException we) {
// expected
@@ -781,7 +774,7 @@ public class TestBKLogSegmentWriter extends TestDistributedLogBase {
createLogSegmentWriter(confLocal, 0L, -1L, lock);
assertEquals(DLSN.InvalidDLSN,
- Await.result(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2))));
+ Utils.ioResult(writer.asyncWrite(DLMTestUtil.getLogRecordInstance(2))));
assertEquals(-1L, ((BKLogSegmentEntryWriter) writer.getEntryWriter())
.getLedgerHandle().getLastAddPushed());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
index 2566d34..c0f208f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogWriteHandler.java
@@ -17,12 +17,13 @@
*/
package org.apache.distributedlog;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.bk.LedgerAllocator;
import org.apache.distributedlog.bk.LedgerAllocatorPool;
import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.Utils;
import org.junit.Rule;
import org.junit.Test;
@@ -58,7 +59,7 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase {
confLocal.setLedgerAllocatorPoolName("test-allocator-pool");
BKDistributedLogNamespace namespace = (BKDistributedLogNamespace)
- DistributedLogNamespaceBuilder.newBuilder()
+ NamespaceBuilder.newBuilder()
.conf(confLocal)
.uri(uri)
.build();
@@ -66,8 +67,8 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase {
FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentOnAssignLogSegmentSequenceNumber,
FailpointUtils.FailPointActions.FailPointAction_Throw);
try {
- AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
fail("Should fail opening the writer");
} catch (IOException ioe) {
// expected
@@ -82,7 +83,7 @@ public class TestBKLogWriteHandler extends TestDistributedLogBase {
LedgerAllocatorPool allocatorPool = (LedgerAllocatorPool) allocator;
assertEquals(0, allocatorPool.obtainMapSize());
- AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
writer.write(DLMTestUtil.getLogRecordInstance(1L));
Utils.close(writer);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java
index bb8503f..07f0db5 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKSyncLogReader.java
@@ -17,6 +17,8 @@
*/
package org.apache.distributedlog;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.junit.Rule;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
index a766d3e..5e4ba07 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java
@@ -21,6 +21,9 @@ import static org.junit.Assert.assertTrue;
import com.google.common.base.Optional;
import com.google.common.base.Ticker;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryWriter;
import org.apache.distributedlog.injector.AsyncFailureInjector;
@@ -29,14 +32,13 @@ import org.apache.distributedlog.io.AsyncCloseable;
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.logsegment.LogSegmentMetadataCache;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitLimiter;
-import org.apache.distributedlog.util.SchedulerUtils;
-import com.twitter.util.Future;
+import org.apache.distributedlog.common.util.PermitLimiter;
+import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
@@ -171,7 +173,7 @@ public class TestDistributedLogBase {
throws Exception {
URI uri = createDLMURI("/" + name);
ensureURICreated(uri);
- final DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ final Namespace namespace = NamespaceBuilder.newBuilder()
.uri(uri)
.conf(conf)
.build();
@@ -181,14 +183,14 @@ public class TestDistributedLogBase {
.build();
AsyncCloseable resourcesCloseable = new AsyncCloseable() {
@Override
- public Future<Void> asyncClose() {
+ public CompletableFuture<Void> asyncClose() {
LOG.info("Shutting down the scheduler");
SchedulerUtils.shutdownScheduler(scheduler, 1, TimeUnit.SECONDS);
LOG.info("Shut down the scheduler");
LOG.info("Closing the namespace");
namespace.close();
LOG.info("Closed the namespace");
- return Future.Void();
+ return FutureUtils.Void();
}
};
AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder()
@@ -217,20 +219,20 @@ public class TestDistributedLogBase {
Optional.of(resourcesCloseable));
}
- protected LogSegmentMetadataStore getLogSegmentMetadataStore(DistributedLogNamespace namespace)
+ protected LogSegmentMetadataStore getLogSegmentMetadataStore(Namespace namespace)
throws IOException {
return namespace.getNamespaceDriver().getLogStreamMetadataStore(NamespaceDriver.Role.READER)
.getLogSegmentMetadataStore();
}
- protected ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) throws Exception {
+ protected ZooKeeperClient getZooKeeperClient(Namespace namespace) throws Exception {
NamespaceDriver driver = namespace.getNamespaceDriver();
assertTrue(driver instanceof BKNamespaceDriver);
return ((BKNamespaceDriver) driver).getWriterZKC();
}
@SuppressWarnings("deprecation")
- protected BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) throws Exception {
+ protected BookKeeperClient getBookKeeperClient(Namespace namespace) throws Exception {
NamespaceDriver driver = namespace.getNamespaceDriver();
assertTrue(driver instanceof BKNamespaceDriver);
return ((BKNamespaceDriver) driver).getReaderBKC();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
index 6d8bd0c..30ef481 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestEntry.java
@@ -17,19 +17,17 @@
*/
package org.apache.distributedlog;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.Entry.Reader;
import org.apache.distributedlog.Entry.Writer;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.io.Buffer;
import org.apache.distributedlog.io.CompressionCodec;
-import com.twitter.io.Buf;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Test;
@@ -80,7 +78,7 @@ public class TestEntry {
LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]);
try {
- writer.writeRecord(largeRecord, new Promise<DLSN>());
+ writer.writeRecord(largeRecord, new CompletableFuture<DLSN>());
Assert.fail("Should fail on writing large record");
} catch (LogRecordTooLongException lrtle) {
// expected
@@ -103,12 +101,12 @@ public class TestEntry {
assertEquals("zero bytes", 0, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
- List<Future<DLSN>> writePromiseList = Lists.newArrayList();
+ List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
// write first 5 records
for (int i = 0; i < 5; i++) {
LogRecord record = new LogRecord(i, ("record-" + i).getBytes(UTF_8));
record.setPositionWithinLogSegment(i);
- Promise<DLSN> writePromise = new Promise<DLSN>();
+ CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords());
@@ -117,7 +115,7 @@ public class TestEntry {
// write large record
LogRecord largeRecord = new LogRecord(1L, new byte[MAX_LOGRECORD_SIZE + 1]);
try {
- writer.writeRecord(largeRecord, new Promise<DLSN>());
+ writer.writeRecord(largeRecord, new CompletableFuture<DLSN>());
Assert.fail("Should fail on writing large record");
} catch (LogRecordTooLongException lrtle) {
// expected
@@ -128,7 +126,7 @@ public class TestEntry {
for (int i = 0; i < 5; i++) {
LogRecord record = new LogRecord(i + 5, ("record-" + (i + 5)).getBytes(UTF_8));
record.setPositionWithinLogSegment(i + 5);
- Promise<DLSN> writePromise = new Promise<DLSN>();
+ CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
@@ -138,7 +136,7 @@ public class TestEntry {
// Test transmit complete
writer.completeTransmit(1L, 1L);
- List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
+ List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
for (int i = 0; i < 10; i++) {
Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
}
@@ -175,23 +173,23 @@ public class TestEntry {
assertEquals("zero bytes", 0, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
- List<Future<DLSN>> writePromiseList = Lists.newArrayList();
+ List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
// write first 5 records
for (int i = 0; i < 5; i++) {
LogRecord record = new LogRecord(i, ("record-" + i).getBytes(UTF_8));
record.setPositionWithinLogSegment(i);
- Promise<DLSN> writePromise = new Promise<DLSN>();
+ CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords());
}
final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(1024, CompressionCodec.Type.NONE);
- List<Future<DLSN>> recordSetPromiseList = Lists.newArrayList();
+ List<CompletableFuture<DLSN>> recordSetPromiseList = Lists.newArrayList();
// write another 5 records as a batch
for (int i = 0; i < 5; i++) {
ByteBuffer record = ByteBuffer.wrap(("record-" + (i + 5)).getBytes(UTF_8));
- Promise<DLSN> writePromise = new Promise<DLSN>();
+ CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
recordSetWriter.writeRecord(record, writePromise);
recordSetPromiseList.add(writePromise);
assertEquals((i + 1) + " records", (i + 1), recordSetWriter.getNumRecords());
@@ -202,8 +200,8 @@ public class TestEntry {
LogRecord setRecord = new LogRecord(5L, data);
setRecord.setPositionWithinLogSegment(5);
setRecord.setRecordSet();
- Promise<DLSN> writePromise = new Promise<DLSN>();
- writePromise.addEventListener(new FutureEventListener<DLSN>() {
+ CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
+ writePromise.whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN dlsn) {
recordSetWriter.completeTransmit(
@@ -224,7 +222,7 @@ public class TestEntry {
for (int i = 0; i < 5; i++) {
LogRecord record = new LogRecord(i + 10, ("record-" + (i + 10)).getBytes(UTF_8));
record.setPositionWithinLogSegment(i + 10);
- writePromise = new Promise<DLSN>();
+ writePromise = new CompletableFuture<DLSN>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 11) + " records", (i + 11), writer.getNumRecords());
@@ -234,7 +232,7 @@ public class TestEntry {
// Test transmit complete
writer.completeTransmit(1L, 1L);
- List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
+ List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writePromiseList));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, 1L, i), writeResults.get(i));
}
@@ -242,7 +240,7 @@ public class TestEntry {
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, 1L, (10 + i)), writeResults.get(6 + i));
}
- List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetPromiseList));
+ List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetPromiseList));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, 1L, (5 + i)), recordSetWriteResults.get(i));
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java
index fd3c4ee..c111baf 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestInterleavedReaders.java
@@ -17,7 +17,9 @@
*/
package org.apache.distributedlog;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.util.Utils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,11 +83,11 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
BKAsyncLogWriter writer1 = dlmwrite1.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= 4; j++) {
for (int k = 1; k <= 10; k++) {
- FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
- FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
}
- FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
- FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
@@ -124,13 +126,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
writer1.setForceRolling(true);
}
for (int k = 1; k <= 2; k++) {
- FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
- FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
writer0.setForceRolling(false);
writer1.setForceRolling(false);
}
- FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
- FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
LOG.info("Completed {} write", j);
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
@@ -170,13 +172,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
writer0.setForceRolling(true);
writer1.setForceRolling(true);
}
- FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
- FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
writer0.setForceRolling(false);
writer1.setForceRolling(false);
}
- FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
- FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
@@ -212,9 +214,9 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
writer1.setForceRolling(true);
writer1.overRideMinTimeStampToKeep(retentionPeriodOverride);
}
- DLSN dlsn1 = FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ DLSN dlsn1 = Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("writer1 write record {}", dlsn1);
- DLSN dlsn0 = FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ DLSN dlsn0 = Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("writer0 write record {}", dlsn0);
if (k == 5) {
writer0.setForceRolling(false);
@@ -223,8 +225,8 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
}
Thread.sleep(5);
}
- FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
- FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
}
writer0.close();
writer1.close();
@@ -264,15 +266,15 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
writer0.setForceRecovery(true);
writer1.setForceRecovery(true);
}
- DLSN dlsn1 = FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ DLSN dlsn1 = Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("writer1 write record {} - txid = {}", dlsn1, txid-1);
- DLSN dlsn0 = FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ DLSN dlsn0 = Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("writer0 write record {} - txid = {}", dlsn0, txid-1);
writer0.setForceRecovery(false);
writer1.setForceRecovery(false);
}
- FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
- FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
@@ -313,13 +315,13 @@ public class TestInterleavedReaders extends TestDistributedLogBase {
writer1.setForceRolling(true);
}
for (int k = 1; k <= 2; k++) {
- FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
- FutureUtils.result(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer0.write(DLMTestUtil.getLogRecordInstance(txid++)));
writer0.setForceRolling(false);
writer1.setForceRolling(false);
}
- FutureUtils.result(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
- FutureUtils.result(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer1.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
+ Utils.ioResult(writer0.writeControlRecord(DLMTestUtil.getLogRecordInstance(txid-1)));
if (null == reader0) {
reader0 = dlmreader0.getInputStream(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java
index 152e4d8..8bdf86d 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentCreation.java
@@ -20,9 +20,10 @@ package org.apache.distributedlog;
import java.net.URI;
import java.util.List;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.util.Await;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.util.Utils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@ public class TestLogSegmentCreation extends TestDistributedLogBase {
.setImmediateFlushEnabled(true)
.setEnableLedgerAllocatorPool(true)
.setLedgerAllocatorPoolName("test");
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
DistributedLogManager dlm = namespace.openLog(name);
final int numSegments = 3;
@@ -68,7 +69,7 @@ public class TestLogSegmentCreation extends TestDistributedLogBase {
writer2.closeAndComplete();
try {
- Await.result(writer1.write(DLMTestUtil.getLogRecordInstance(numSegments + 1)));
+ Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(numSegments + 1)));
fail("Should fail on writing new log records.");
} catch (Throwable t) {
LOG.error("Failed to write entry : ", t);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java
index 31df059..39ffe85 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentMetadata.java
@@ -21,8 +21,7 @@ import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataBuilder;
import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
import org.apache.distributedlog.LogSegmentMetadata.TruncationStatus;
import org.apache.distributedlog.exceptions.UnsupportedMetadataVersionException;
-
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.Utils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -64,7 +63,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase {
LogSegmentMetadata metadata1 = new LogSegmentMetadataBuilder("/metadata1",
LogSegmentMetadata.LEDGER_METADATA_CURRENT_LAYOUT_VERSION, 1000, 1).setRegionId(TEST_REGION_ID).build();
metadata1.write(zkc);
- LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata1"));
+ LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata1"));
assertEquals(metadata1, read1);
assertEquals(TEST_REGION_ID, read1.getRegionId());
}
@@ -75,7 +74,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase {
1, 1000, 1).setRegionId(TEST_REGION_ID).build();
metadata1.write(zkc);
// synchronous read
- LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata2", true));
+ LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata2", true));
assertEquals(read1.getLogSegmentId(), metadata1.getLogSegmentId());
assertEquals(read1.getFirstTxId(), metadata1.getFirstTxId());
assertEquals(read1.getLastTxId(), metadata1.getLastTxId());
@@ -90,7 +89,7 @@ public class TestLogSegmentMetadata extends ZooKeeperClusterTestCase {
metadata1.write(zkc);
// synchronous read
try {
- LogSegmentMetadata read1 = FutureUtils.result(LogSegmentMetadata.read(zkc, "/metadata-failure"));
+ LogSegmentMetadata read1 = Utils.ioResult(LogSegmentMetadata.read(zkc, "/metadata-failure"));
fail("The previous statement should throw an exception");
} catch (UnsupportedMetadataVersionException e) {
// Expected
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
index 8c01a5c..fcc3395 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestLogSegmentsZK.java
@@ -17,11 +17,12 @@
*/
package org.apache.distributedlog;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.metadata.LogMetadata;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.util.DLUtils;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Versioned;
@@ -80,7 +81,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
.setImmediateFlushEnabled(true)
.setEnableLedgerAllocatorPool(true)
.setLedgerAllocatorPoolName("test");
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
+ Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
namespace.createLog(streamName);
MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
@@ -111,7 +112,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
.setImmediateFlushEnabled(true)
.setEnableLedgerAllocatorPool(true)
.setLedgerAllocatorPoolName("test");
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
+ Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
namespace.createLog(streamName);
MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
@@ -167,7 +168,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
.setImmediateFlushEnabled(true)
.setEnableLedgerAllocatorPool(true)
.setLedgerAllocatorPoolName("test");
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
+ Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
namespace.createLog(streamName);
MaxLogSegmentSequenceNo max1 = getMaxLogSegmentSequenceNo(getZooKeeperClient(namespace), uri, streamName, conf);
@@ -223,7 +224,7 @@ public class TestLogSegmentsZK extends TestDistributedLogBase {
.setImmediateFlushEnabled(true)
.setEnableLedgerAllocatorPool(true)
.setLedgerAllocatorPoolName("test");
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
+ Namespace namespace = NamespaceBuilder.newBuilder().conf(conf).uri(uri).build();
namespace.createLog(streamName);
DistributedLogManager dlm1 = namespace.openLog(streamName);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java
index 5bfbf45..2b02704 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReads.java
@@ -21,9 +21,11 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
import org.apache.distributedlog.exceptions.IdleReaderException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.util.Utils;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -229,7 +231,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
BKAsyncLogWriter out = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= segmentSize; j++) {
LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
- FutureUtils.result(out.write(op));
+ Utils.ioResult(out.write(op));
numRecordsWritten++;
}
out.closeAndComplete();
@@ -237,7 +239,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
String completedZNode = blplm.completedLedgerZNode(txid - segmentSize, txid - 1, 3);
- LogSegmentMetadata metadata = FutureUtils.result(LogSegmentMetadata.read(zkClient, completedZNode));
+ LogSegmentMetadata metadata = Utils.ioResult(LogSegmentMetadata.read(zkClient, completedZNode));
zkClient.get().delete(completedZNode, -1);
LogSegmentMetadata metadataToChange =
metadata.mutator()
@@ -253,7 +255,7 @@ public class TestNonBlockingReads extends TestDistributedLogBase {
BKAsyncLogWriter out = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= segmentSize; j++) {
LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
- FutureUtils.result(out.write(op));
+ Utils.ioResult(out.write(op));
numRecordsWritten++;
}
out.closeAndComplete();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java
index 8f445c4..6c9e354 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestNonBlockingReadsMultiReader.java
@@ -18,8 +18,10 @@
package org.apache.distributedlog;
import com.google.common.util.concurrent.RateLimiter;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.Utils;
import org.junit.Test;
@@ -93,8 +95,8 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
DistributedLogManager dlmwrite = createNewDLM(confLocal, name);
final AsyncLogWriter writer = dlmwrite.startAsyncLogSegmentNonPartitioned();
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(0)));
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1)));
final AtomicInteger writeCount = new AtomicInteger(2);
DistributedLogManager dlmread = createNewDLM(conf, name);
@@ -116,7 +118,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
while (running.get()) {
limiter.acquire();
long curTxId = txid++;
- dlsn = FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
+ dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
writeCount.incrementAndGet();
if (curTxId % 1000 == 0) {
LOG.info("writer write {}", curTxId);
@@ -126,7 +128,7 @@ public class TestNonBlockingReadsMultiReader extends TestDistributedLogBase {
Utils.close(writer);
} catch (DLInterruptedException die) {
Thread.currentThread().interrupt();
- } catch (IOException e) {
+ } catch (Exception e) {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
index ac9984b..eda8eb2 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadAheadEntryReader.java
@@ -20,16 +20,18 @@ package org.apache.distributedlog;
import com.google.common.base.Optional;
import com.google.common.base.Ticker;
import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryStore;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.logsegment.LogSegmentEntryStore;
import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.After;
@@ -130,14 +132,14 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
}
private void ensureOrderSchedulerEmpty(String streamName) throws Exception {
- final Promise<Void> promise = new Promise<Void>();
+ final CompletableFuture<Void> promise = new CompletableFuture<Void>();
scheduler.submit(streamName, new Runnable() {
@Override
public void run() {
- FutureUtils.setValue(promise, null);
+ FutureUtils.complete(promise, null);
}
});
- FutureUtils.result(promise);
+ Utils.ioResult(promise);
}
void generateCompletedLogSegments(DistributedLogManager dlm,
@@ -153,12 +155,12 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
long txid = startTxId;
for (long i = 0; i < numCompletedSegments; i++) {
- AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
for (long j = 1; j <= segmentSize; j++) {
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(txid);
ctrlRecord.setControl();
- FutureUtils.result(writer.write(ctrlRecord));
+ Utils.ioResult(writer.write(ctrlRecord));
}
Utils.close(writer);
}
@@ -167,12 +169,12 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
AsyncLogWriter createInprogressLogSegment(DistributedLogManager dlm,
DistributedLogConfiguration conf,
long segmentSize) throws Exception {
- AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
for (long i = 1L; i <= segmentSize; i++) {
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
LogRecord ctrlRecord = DLMTestUtil.getLogRecordInstance(i);
ctrlRecord.setControl();
- FutureUtils.result(writer.write(ctrlRecord));
+ Utils.ioResult(writer.write(ctrlRecord));
}
return writer;
}
@@ -325,8 +327,8 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
// generate list of log segments
generateCompletedLogSegments(dlm, 3, 3);
- AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
- FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L)));
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+ Utils.ioResult(writer.truncate(new DLSN(2L, 1L, 0L)));
List<LogSegmentMetadata> segments = dlm.getLogSegments();
@@ -382,8 +384,8 @@ public class TestReadAheadEntryReader extends TestDistributedLogBase {
// generate list of log segments
generateCompletedLogSegments(dlm, 3, 2);
- AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
- FutureUtils.result(writer.truncate(new DLSN(2L, 1L, 0L)));
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+ Utils.ioResult(writer.truncate(new DLSN(2L, 1L, 0L)));
List<LogSegmentMetadata> segments = dlm.getLogSegments();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java
index 029e872..efc9ac6 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReadUtils.java
@@ -18,16 +18,15 @@
package org.apache.distributedlog;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.logsegment.LogSegmentFilter;
-import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
import org.junit.Rule;
import org.junit.Test;
@@ -47,7 +46,7 @@ public class TestReadUtils extends TestDistributedLogBase {
@Rule
public TestName runtime = new TestName();
- private Future<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
+ private CompletableFuture<Optional<LogRecordWithDLSN>> getLogRecordNotLessThanTxId(
BKDistributedLogManager bkdlm, int logsegmentIdx, long transactionId) throws Exception {
List<LogSegmentMetadata> logSegments = bkdlm.getLogSegments();
return ReadUtils.getLogRecordNotLessThanTxId(
@@ -60,7 +59,7 @@ public class TestReadUtils extends TestDistributedLogBase {
);
}
- private Future<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception {
+ private CompletableFuture<LogRecordWithDLSN> getFirstGreaterThanRecord(BKDistributedLogManager bkdlm, int ledgerNo, DLSN dlsn) throws Exception {
List<LogSegmentMetadata> ledgerList = bkdlm.getLogSegments();
return ReadUtils.asyncReadFirstUserRecord(
bkdlm.getStreamName(), ledgerList.get(ledgerNo), 2, 16, new AtomicInteger(0), Executors.newFixedThreadPool(1),
@@ -68,9 +67,9 @@ public class TestReadUtils extends TestDistributedLogBase {
);
}
- private Future<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception {
+ private CompletableFuture<LogRecordWithDLSN> getLastUserRecord(BKDistributedLogManager bkdlm, int ledgerNo) throws Exception {
BKLogReadHandler readHandler = bkdlm.createReadHandler();
- List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+ List<LogSegmentMetadata> ledgerList = Utils.ioResult(
readHandler.readLogSegmentsFromStore(
LogSegmentMetadata.COMPARATOR,
LogSegmentFilter.DEFAULT_FILTER,
@@ -89,8 +88,8 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */);
DLSN dlsn = new DLSN(1,0,0);
- Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals("should be an exact match", dlsn, logrec.getDlsn());
bkdlm.close();
}
@@ -102,8 +101,8 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */);
DLSN dlsn = new DLSN(1,1,0);
- Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals("should be an exact match", dlsn, logrec.getDlsn());
bkdlm.close();
}
@@ -115,8 +114,8 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1 /* txid */);
DLSN dlsn = new DLSN(1,0,1);
- Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals(new DLSN(1,1,0), logrec.getDlsn());
bkdlm.close();
}
@@ -128,8 +127,8 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5 /* user recs */ , 1 /* txid */);
DLSN dlsn = new DLSN(2,0,0);
- Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals(null, logrec);
bkdlm.close();
}
@@ -144,8 +143,8 @@ public class TestReadUtils extends TestDistributedLogBase {
txid += DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5 /* user recs */ , txid);
DLSN dlsn = new DLSN(1,3,0);
- Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 1, dlsn);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 1, dlsn);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals(new DLSN(2,0,0), logrec.getDlsn());
bkdlm.close();
}
@@ -157,8 +156,8 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 5, 1 /* txid */);
DLSN dlsn = new DLSN(1,3,0);
- Future<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getFirstGreaterThanRecord(bkdlm, 0, dlsn);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals(new DLSN(1,5,0), logrec.getDlsn());
bkdlm.close();
}
@@ -169,8 +168,8 @@ public class TestReadUtils extends TestDistributedLogBase {
BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 5, 1 /* txid */);
- Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals(new DLSN(1,9,0), logrec.getDlsn());
bkdlm.close();
}
@@ -182,15 +181,15 @@ public class TestReadUtils extends TestDistributedLogBase {
AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned();
int txid = 1;
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true)));
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, true)));
Utils.close(out);
- Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals(new DLSN(1,2,0), logrec.getDlsn());
bkdlm.close();
}
@@ -201,8 +200,8 @@ public class TestReadUtils extends TestDistributedLogBase {
BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 5 /* control recs */, 0, 1 /* txid */);
- Future<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
- LogRecordWithDLSN logrec = Await.result(futureLogrec);
+ CompletableFuture<LogRecordWithDLSN> futureLogrec = getLastUserRecord(bkdlm, 0);
+ LogRecordWithDLSN logrec = Utils.ioResult(futureLogrec);
assertEquals(null, logrec);
bkdlm.close();
}
@@ -259,7 +258,7 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 1, 1 /* txid */);
Optional<LogRecordWithDLSN> result =
- FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 999L));
+ Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 999L));
assertFalse(result.isPresent());
}
@@ -270,7 +269,7 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 1, 999L /* txid */);
Optional<LogRecordWithDLSN> result =
- FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 99L));
+ Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 99L));
assertTrue(result.isPresent());
assertEquals(999L, result.get().getTransactionId());
assertEquals(0L, result.get().getDlsn().getEntryId());
@@ -284,7 +283,7 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 5, 1L /* txid */);
Optional<LogRecordWithDLSN> result =
- FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 3L));
+ Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 3L));
assertTrue(result.isPresent());
assertEquals(3L, result.get().getTransactionId());
}
@@ -296,7 +295,7 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 100, 1L /* txid */);
Optional<LogRecordWithDLSN> result =
- FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 9L));
+ Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 9L));
assertTrue(result.isPresent());
assertEquals(9L, result.get().getTransactionId());
}
@@ -308,7 +307,7 @@ public class TestReadUtils extends TestDistributedLogBase {
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0 /* control recs */, 100, 1L /* txid */, 3L);
Optional<LogRecordWithDLSN> result =
- FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, 23L));
+ Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, 23L));
assertTrue(result.isPresent());
assertEquals(25L, result.get().getTransactionId());
}
@@ -321,22 +320,22 @@ public class TestReadUtils extends TestDistributedLogBase {
long txid = 1L;
for (int i = 0; i < 10; ++i) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
- Await.result(out.write(record));
+ Utils.ioResult(out.write(record));
txid += 1;
}
long txidToSearch = txid;
for (int i = 0; i < 10; ++i) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txidToSearch);
- Await.result(out.write(record));
+ Utils.ioResult(out.write(record));
}
for (int i = 0; i < 10; ++i) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid);
- Await.result(out.write(record));
+ Utils.ioResult(out.write(record));
txid += 1;
}
Utils.close(out);
Optional<LogRecordWithDLSN> result =
- FutureUtils.result(getLogRecordNotLessThanTxId(bkdlm, 0, txidToSearch));
+ Utils.ioResult(getLogRecordNotLessThanTxId(bkdlm, 0, txidToSearch));
assertTrue(result.isPresent());
assertEquals(10L, result.get().getDlsn().getEntryId());
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java
index ad5bf8e..8d9f846 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestReader.java
@@ -17,13 +17,13 @@
*/
package org.apache.distributedlog;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
@@ -140,8 +140,8 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> {
}
private void readNext() {
- Future<LogRecordWithDLSN> record = reader.readNext();
- record.addEventListener(this);
+ CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
+ record.whenComplete(this);
}
@Override
@@ -184,12 +184,8 @@ public class TestReader implements FutureEventListener<LogRecordWithDLSN> {
private void closeReader() {
if (null != reader) {
- reader.asyncClose().onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable cause) {
- LOG.warn("Exception on closing reader {} : ", readerName, cause);
- return BoxedUnit.UNIT;
- }
+ reader.asyncClose().whenComplete((value, cause) -> {
+ LOG.warn("Exception on closing reader {} : ", readerName, cause);
});
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
index 9032866..0111e4d 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestRollLogSegments.java
@@ -20,14 +20,16 @@ package org.apache.distributedlog;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.feature.CoreFeatureKeys;
import org.apache.distributedlog.impl.logsegment.BKLogSegmentEntryReader;
import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.feature.SettableFeature;
@@ -35,9 +37,7 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations.FlakyTest;
-import com.twitter.util.Await;
-import com.twitter.util.FutureEventListener;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations.FlakyTest;
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.*;
@@ -79,7 +79,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
// send requests in parallel
for (int i = 1; i <= numEntries; i++) {
final int entryId = i;
- writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() {
+ writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
@@ -125,7 +125,9 @@ public class TestRollLogSegments extends TestDistributedLogBase {
// send requests in parallel to have outstanding requests
for (int i = 1; i <= numEntries; i++) {
final int entryId = i;
- Future<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() {
+ CompletableFuture<DLSN> writeFuture =
+ writer.write(DLMTestUtil.getLogRecordInstance(entryId))
+ .whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
@@ -146,7 +148,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
});
if (i == 1) {
// wait for first log segment created
- FutureUtils.result(writeFuture);
+ Utils.ioResult(writeFuture);
}
}
latch.await();
@@ -191,7 +193,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
long txId = 1L;
// Create Log Segments
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId)));
FailpointUtils.setFailpoint(FailpointUtils.FailPointName.FP_StartLogSegmentBeforeLedgerCreate,
FailpointUtils.FailPointActions.FailPointAction_Throw);
@@ -201,7 +203,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
final int numRecords = 10;
final CountDownLatch latch = new CountDownLatch(numRecords);
for (int i = 0; i < numRecords; i++) {
- writer.write(DLMTestUtil.getLogRecordInstance(++txId)).addEventListener(new FutureEventListener<DLSN>() {
+ writer.write(DLMTestUtil.getLogRecordInstance(++txId)).whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
logger.info("Completed entry : {}.", value);
@@ -266,7 +268,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
// send requests in parallel to have outstanding requests
for (int i = 1; i <= numLogSegments; i++) {
final int entryId = i;
- Future<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).addEventListener(new FutureEventListener<DLSN>() {
+ CompletableFuture<DLSN> writeFuture = writer.write(DLMTestUtil.getLogRecordInstance(entryId)).whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
logger.info("Completed entry {} : {}.", entryId, value);
@@ -279,7 +281,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
});
if (i == 1) {
// wait for first log segment created
- FutureUtils.result(writeFuture);
+ Utils.ioResult(writeFuture);
}
}
latch.await();
@@ -297,7 +299,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
// writer should work after rolling log segments
// there would be (numLogSegments/2) segments based on current rolling policy
for (int i = 1; i <= numLogSegments; i++) {
- DLSN newDLSN = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(numLogSegments + i)));
+ DLSN newDLSN = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(numLogSegments + i)));
logger.info("Completed entry {} : {}", numLogSegments + i, newDLSN);
}
@@ -364,7 +366,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
// 2) reader should be able to read 5 entries.
for (long i = 1; i <= numEntries; i++) {
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
DLMTestUtil.verifyLogRecord(record);
assertEquals(i, record.getTransactionId());
assertEquals(record.getTransactionId() - 1, record.getSequenceId());
@@ -418,7 +420,7 @@ public class TestRollLogSegments extends TestDistributedLogBase {
anotherWriter.closeAndComplete();
for (long i = numEntries + 1; i <= numEntries + 3; i++) {
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
DLMTestUtil.verifyLogRecord(record);
assertEquals(i, record.getTransactionId());
}