You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:39 UTC
[07/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
index 2f5766d..5123178 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestAsyncReaderWriter.java
@@ -22,6 +22,7 @@ import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -30,16 +31,22 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
-import org.apache.distributedlog.config.ConcurrentBaseConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAccessor;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -63,15 +70,10 @@ import org.apache.distributedlog.exceptions.OverCapacityException;
import org.apache.distributedlog.exceptions.ReadCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.lock.DistributedLock;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.SimplePermitLimiter;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
import junit.framework.Assert;
import static com.google.common.base.Charsets.UTF_8;
@@ -111,13 +113,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (long i = 0; i < 3; i++) {
final long currentLogSegmentSeqNo = i + 1;
BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
- DLSN dlsn = Await.result(writer.writeControlRecord(new LogRecord(txid++, "control".getBytes(UTF_8))));
+ DLSN dlsn = Utils.ioResult(writer.writeControlRecord(new LogRecord(txid++, "control".getBytes(UTF_8))));
assertEquals(currentLogSegmentSeqNo, dlsn.getLogSegmentSequenceNo());
assertEquals(0, dlsn.getEntryId());
assertEquals(0, dlsn.getSlotId());
for (long j = 1; j < 10; j++) {
final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
- Await.result(writer.write(record));
+ Utils.ioResult(writer.write(record));
}
writer.closeAndComplete();
}
@@ -161,8 +163,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
// Write one record larger than max seg size. Ledger doesn't roll until next write.
int txid = 1;
LogRecord record = DLMTestUtil.getLogRecordInstance(txid++, 2048);
- Future<DLSN> result = writer.write(record);
- DLSN dlsn = Await.result(result, Duration.fromSeconds(10));
+ CompletableFuture<DLSN> result = writer.write(record);
+ DLSN dlsn = Utils.ioResult(result, 10, TimeUnit.SECONDS);
assertEquals(1, dlsn.getLogSegmentSequenceNo());
record = DLMTestUtil.getLogRecordInstance(txid++, MAX_LOGRECORD_SIZE + 1);
@@ -207,8 +209,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (long j = 0; j < numRecordsPerLogSegment; j++) {
final long currentEntryId = j;
final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
- Future<DLSN> dlsnFuture = writer.write(record);
- dlsnFuture.addEventListener(new FutureEventListener<DLSN>() {
+ CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+ dlsnFuture.whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
if(value.getLogSegmentSequenceNo() != currentLogSegmentSeqNo) {
@@ -245,7 +247,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
assertEquals("Last DLSN" + last.getDlsn() + " isn't the maximum DLSN " + maxDLSN.get(),
last.getDlsn(), maxDLSN.get());
assertEquals(last.getDlsn(), dlm.getLastDLSN());
- assertEquals(last.getDlsn(), Await.result(dlm.getLastDLSNAsync()));
+ assertEquals(last.getDlsn(), Utils.ioResult(dlm.getLastDLSNAsync()));
DLMTestUtil.verifyLargeLogRecord(last);
dlm.close();
@@ -330,8 +332,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
final CountDownLatch syncLatch,
final CountDownLatch completionLatch,
final AtomicBoolean errorsFound) {
- Future<LogRecordWithDLSN> record = reader.readNext();
- record.addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+ CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
+ record.whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
@Override
public void onSuccess(LogRecordWithDLSN value) {
try {
@@ -455,7 +457,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
if (expectedTxID == numLogSegments * numRecordsPerLogSegment) {
break;
}
- List<LogRecordWithDLSN> records = Await.result(reader.readBulk(20));
+ List<LogRecordWithDLSN> records = Utils.ioResult(reader.readBulk(20));
LOG.info("Bulk read {} entries.", records.size());
assertTrue(records.size() >= 1);
@@ -495,7 +497,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (long i = 0; i < 3; i++) {
// since we batched 20 entries into single bookkeeper entry
// we should be able to read 20 entries as a batch.
- List<LogRecordWithDLSN> records = Await.result(reader.readBulk(20));
+ List<LogRecordWithDLSN> records = Utils.ioResult(reader.readBulk(20));
assertEquals(20, records.size());
for (LogRecordWithDLSN record : records) {
assertEquals(expectedTxID, record.getTransactionId());
@@ -531,7 +533,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
name, asyncReader.getStreamName());
long numTrans = 0;
DLSN lastDLSN = DLSN.InvalidDLSN;
- LogRecordWithDLSN record = Await.result(asyncReader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(asyncReader.readNext());
while (null != record) {
DLMTestUtil.verifyEmptyLogRecord(record);
assertEquals(0, record.getDlsn().getSlotId());
@@ -541,7 +543,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
if (numTrans >= (txid - 1)) {
break;
}
- record = Await.result(asyncReader.readNext());
+ record = Utils.ioResult(asyncReader.readNext());
}
assertEquals((txid - 1), numTrans);
Utils.close(asyncReader);
@@ -715,8 +717,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (long j = 0; j < 10; j++) {
final long currentEntryId = j;
final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
- Future<DLSN> dlsnFuture = writer.write(record);
- dlsnFuture.addEventListener(new WriteFutureEventListener(
+ CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+ dlsnFuture.whenComplete(new WriteFutureEventListener(
record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
if (i == 0 && j == 0) {
boolean monotonic = LogSegmentMetadata.supportsSequenceId(logSegmentVersion);
@@ -793,8 +795,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (long j = 0; j < 10; j++) {
final long currentEntryId = j;
final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
- Future<DLSN> dlsnFuture = writer.write(record);
- dlsnFuture.addEventListener(new WriteFutureEventListener(
+ CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+ dlsnFuture.whenComplete(new WriteFutureEventListener(
record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
}
writer.closeAndComplete();
@@ -835,7 +837,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
URI uri = createDLMURI("/" + name);
ensureURICreated(uri);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(confLocal).uri(uri).build();
final DistributedLogManager[] dlms = new DistributedLogManager[count];
final TestReader[] readers = new TestReader[count];
@@ -868,8 +870,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
final long currentEntryId = j;
final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
for (int s = 0; s < count; s++) {
- Future<DLSN> dlsnFuture = writers[s].write(record);
- dlsnFuture.addEventListener(new WriteFutureEventListener(
+ CompletableFuture<DLSN> dlsnFuture = writers[s].write(record);
+ dlsnFuture.whenComplete(new WriteFutureEventListener(
record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
}
}
@@ -937,8 +939,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (long j = 0; j < numRecordsPerLogSegment; j++) {
final long currentEntryId = j;
final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
- Future<DLSN> dlsnFuture = writer.write(record);
- dlsnFuture.addEventListener(new WriteFutureEventListener(
+ CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+ dlsnFuture.whenComplete(new WriteFutureEventListener(
record, currentLogSegmentSeqNo, currentEntryId, writeLatch, writeErrors, true));
}
writer.closeAndComplete();
@@ -988,8 +990,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (long j = 0; j < numRecordsPerLogSegment; j++) {
Thread.sleep(50);
final LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
- Future<DLSN> dlsnFuture = writer.write(record);
- dlsnFuture.addEventListener(new WriteFutureEventListener(
+ CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+ dlsnFuture.whenComplete(new WriteFutureEventListener(
record, currentLogSegmentSeqNo, j, writeLatch, writeErrors, false));
if (i == 0 && j == 0) {
boolean monotonic = LogSegmentMetadata.supportsSequenceId(confLocal.getDLLedgerMetadataLayoutVersion());
@@ -1027,7 +1029,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
writer.closeAndComplete();
final AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN);
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertEquals(1L, record.getTransactionId());
DLMTestUtil.verifyLogRecord(record);
@@ -1037,7 +1039,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
@Override
public void run() {
try {
- Await.result(reader.readNext());
+ Utils.ioResult(reader.readNext());
} catch (ReadCancelledException rce) {
receiveExpectedException.set(true);
} catch (Throwable t) {
@@ -1060,7 +1062,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
// closed reader should reject any readNext
try {
- Await.result(reader.readNext());
+ Utils.ioResult(reader.readNext());
fail("Reader should reject readNext if it is closed.");
} catch (ReadCancelledException rce) {
// expected
@@ -1087,8 +1089,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (long i = 0; i < COUNT; i++) {
Thread.sleep(1);
final LogRecord record = DLMTestUtil.getLogRecordInstance(txid++);
- Future<DLSN> dlsnFuture = writer.write(record);
- dlsnFuture.addEventListener(new FutureEventListener<DLSN>() {
+ CompletableFuture<DLSN> dlsnFuture = writer.write(record);
+ dlsnFuture.whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
syncLatch.countDown();
@@ -1142,10 +1144,10 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
URI uri = createDLMURI("/" + name);
ensureURICreated(uri);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(confLocal).uri(uri).clientId("gabbagoo").build();
DistributedLogManager dlm = namespace.openLog(name);
- DistributedLogNamespace namespace1 = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace1 = NamespaceBuilder.newBuilder()
.conf(confLocal).uri(uri).clientId("tortellini").build();
DistributedLogManager dlm1 = namespace1.openLog(name);
@@ -1153,12 +1155,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
// First write succeeds since lock isnt checked until transmit, which is scheduled
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
writer.flushAndCommit();
BKLogSegmentWriter perStreamWriter = writer.getCachedLogWriter();
DistributedLock lock = perStreamWriter.getLock();
- FutureUtils.result(lock.asyncClose());
+ Utils.ioResult(lock.asyncClose());
// Get second writer, steal lock
BKAsyncLogWriter writer2 = (BKAsyncLogWriter)(dlm1.startAsyncLogSegmentNonPartitioned());
@@ -1169,7 +1171,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
// Succeeds, kicks off scheduled flush
Thread.sleep(100);
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
fail("should have thrown");
} catch (LockingException ex) {
LOG.debug("caught exception ", ex);
@@ -1194,13 +1196,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
dlm = createNewDLM(confLocal, runtime.getMethodName());
}
BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
- ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(1000);
+ ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(1000);
for (int i = 0; i < 1000; i++) {
results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
}
- for (Future<DLSN> result : results) {
+ for (CompletableFuture<DLSN> result : results) {
try {
- Await.result(result);
+ Utils.ioResult(result);
if (shouldFail) {
fail("should fail due to no outstanding writes permitted");
}
@@ -1242,12 +1244,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
confLocal.setOutstandingWriteLimitDarkmode(true);
DistributedLogManager dlm = createNewDLM(confLocal, runtime.getMethodName());
BKAsyncLogWriter writer = (BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
- ArrayList<Future<DLSN>> results = new ArrayList<Future<DLSN>>(1000);
+ ArrayList<CompletableFuture<DLSN>> results = new ArrayList<CompletableFuture<DLSN>>(1000);
for (int i = 0; i < 1000; i++) {
results.add(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
}
- for (Future<DLSN> result : results) {
- Await.result(result);
+ for (CompletableFuture<DLSN> result : results) {
+ Utils.ioResult(result);
}
writer.closeAndComplete();
dlm.close();
@@ -1266,7 +1268,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
long txId = 1L;
for (int i = 0; i < 5; i++) {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
}
BKLogSegmentWriter logWriter = writer.getCachedLogWriter();
@@ -1277,7 +1279,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
BookKeeper.DigestType.CRC32, confLocal.getBKDigestPW().getBytes(UTF_8));
try {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
fail("Should fail write to a fenced ledger with BKTransmitException");
} catch (BKTransmitException bkte) {
// expected
@@ -1310,7 +1312,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
long txId = 1L;
for (int i = 0; i < 5; i++) {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
}
BKLogSegmentWriter logWriter = writer.getCachedLogWriter();
@@ -1408,8 +1410,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
int recordCount = 0;
try {
while (true) {
- Future<LogRecordWithDLSN> record = reader.readNext();
- Await.result(record);
+ CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
+ Utils.ioResult(record);
recordCount++;
if (recordCount >= segmentSize * numSegments) {
@@ -1465,7 +1467,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
BKAsyncLogWriter writer =
(BKAsyncLogWriter)(dlm.startAsyncLogSegmentNonPartitioned());
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
writer.abort();
for (int i = 0; i < 2; i++) {
@@ -1548,8 +1550,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
int recordCount = 0;
try {
while (true) {
- Future<LogRecordWithDLSN> record = reader.readNext();
- Await.result(record);
+ CompletableFuture<LogRecordWithDLSN> record = reader.readNext();
+ Utils.ioResult(record);
if (recordCount == 0) {
readLatch.countDown();
}
@@ -1582,7 +1584,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
int numRecords = 10;
for (int i = 0; i < numRecords; i++) {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
assertEquals("last tx id should become " + i,
i, writer.getLastTxId());
}
@@ -1612,16 +1614,16 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
int numRecords = 40;
for (int i = 1; i <= numRecords; i++) {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
assertEquals("last tx id should become " + i,
i, writer.getLastTxId());
}
LogRecord record = DLMTestUtil.getLogRecordInstance(numRecords);
record.setControl();
- Await.result(writer.write(record));
+ Utils.ioResult(writer.write(record));
BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
- record = Await.result(reader.readNext());
+ record = Utils.ioResult(reader.readNext());
LOG.info("Read record {}", record);
assertEquals(1L, record.getTransactionId());
@@ -1629,7 +1631,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
assertTrue(reader.getReadAheadReader().getNumCachedEntries() <= maxAllowedCachedRecords);
for (int i = 2; i <= numRecords; i++) {
- record = Await.result(reader.readNext());
+ record = Utils.ioResult(reader.readNext());
LOG.info("Read record {}", record);
assertEquals((long) i, record.getTransactionId());
TimeUnit.MILLISECONDS.sleep(20);
@@ -1656,18 +1658,18 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
final int NUM_RECORDS = 10;
int i = 1;
for (; i <= NUM_RECORDS; i++) {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
assertEquals("last tx id should become " + i,
i, writer.getLastTxId());
}
- Await.result(writer.markEndOfStream());
+ Utils.ioResult(writer.markEndOfStream());
// Multiple end of streams are ok.
- Await.result(writer.markEndOfStream());
+ Utils.ioResult(writer.markEndOfStream());
try {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(i)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(i)));
fail("Should have thrown");
} catch (EndOfStreamException ex) {
}
@@ -1675,12 +1677,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
LogRecord record = null;
for (int j = 0; j < NUM_RECORDS; j++) {
- record = Await.result(reader.readNext());
+ record = Utils.ioResult(reader.readNext());
assertEquals(j+1, record.getTransactionId());
}
try {
- record = Await.result(reader.readNext());
+ record = Utils.ioResult(reader.readNext());
fail("Should have thrown");
} catch (EndOfStreamException ex) {
}
@@ -1698,9 +1700,9 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
DistributedLogManager dlm = createNewDLM(confLocal, name);
BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
- Await.result(writer.markEndOfStream());
+ Utils.ioResult(writer.markEndOfStream());
try {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(1)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1)));
fail("Should have thrown");
} catch (EndOfStreamException ex) {
}
@@ -1708,7 +1710,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
try {
- LogRecord record = Await.result(reader.readNext());
+ LogRecord record = Utils.ioResult(reader.readNext());
fail("Should have thrown");
} catch (EndOfStreamException ex) {
}
@@ -1726,32 +1728,32 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
DistributedLogManager dlm = createNewDLM(confLocal, name);
BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L);
controlRecord.setControl();
- FutureUtils.result(writer.write(controlRecord));
+ Utils.ioResult(writer.write(controlRecord));
BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
- Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- Future<LogRecordWithDLSN> readFuture = reader.readNext();
+ CompletableFuture<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext();
// write another records
for (int i = 0; i < 5; i++) {
long txid = 2L + i;
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
controlRecord = DLMTestUtil.getLogRecordInstance(txid);
controlRecord.setControl();
- FutureUtils.result(writer.write(controlRecord));
+ Utils.ioResult(writer.write(controlRecord));
}
- List<LogRecordWithDLSN> bulkReadRecords = FutureUtils.result(bulkReadFuture);
+ List<LogRecordWithDLSN> bulkReadRecords = Utils.ioResult(bulkReadFuture);
assertEquals(2, bulkReadRecords.size());
assertEquals(1L, bulkReadRecords.get(0).getTransactionId());
assertEquals(2L, bulkReadRecords.get(1).getTransactionId());
for (LogRecordWithDLSN record : bulkReadRecords) {
DLMTestUtil.verifyLogRecord(record);
}
- LogRecordWithDLSN record = FutureUtils.result(readFuture);
+ LogRecordWithDLSN record = Utils.ioResult(readFuture);
assertEquals(3L, record.getTransactionId());
DLMTestUtil.verifyLogRecord(record);
@@ -1771,16 +1773,16 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
DistributedLogManager dlm = createNewDLM(confLocal, name);
BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
LogRecord controlRecord = DLMTestUtil.getLogRecordInstance(1L);
controlRecord.setControl();
- FutureUtils.result(writer.write(controlRecord));
+ Utils.ioResult(writer.write(controlRecord));
BKAsyncLogReader reader = (BKAsyncLogReader) dlm.getAsyncLogReader(DLSN.InitialDLSN);
- Future<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS);
- Future<LogRecordWithDLSN> readFuture = reader.readNext();
+ CompletableFuture<List<LogRecordWithDLSN>> bulkReadFuture = reader.readBulk(2, 0, TimeUnit.MILLISECONDS);
+ CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext();
- List<LogRecordWithDLSN> bulkReadRecords = FutureUtils.result(bulkReadFuture);
+ List<LogRecordWithDLSN> bulkReadRecords = Utils.ioResult(bulkReadFuture);
assertEquals(1, bulkReadRecords.size());
assertEquals(1L, bulkReadRecords.get(0).getTransactionId());
for (LogRecordWithDLSN record : bulkReadRecords) {
@@ -1790,13 +1792,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
// write another records
for (int i = 0; i < 5; i++) {
long txid = 2L + i;
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid)));
controlRecord = DLMTestUtil.getLogRecordInstance(txid);
controlRecord.setControl();
- FutureUtils.result(writer.write(controlRecord));
+ Utils.ioResult(writer.write(controlRecord));
}
- LogRecordWithDLSN record = FutureUtils.result(readFuture);
+ LogRecordWithDLSN record = Utils.ioResult(readFuture);
assertEquals(2L, record.getTransactionId());
DLMTestUtil.verifyLogRecord(record);
@@ -1832,7 +1834,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
// 3 segments, 10 records each, immediate flush, batch size 1, so just the first
// record in each ledger is discarded, for 30 - 3 = 27 records.
for (int i = 0; i < 27; i++) {
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertFalse(record.getDlsn().getEntryId() % 10 == 0);
}
@@ -1868,7 +1870,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
// 3 segments, 10 records each, immediate flush, batch size 1, so just the first
// record in each ledger is discarded, for 30 - 3 = 27 records.
for (int i = 0; i < 30; i++) {
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertFalse(record.getDlsn().getEntryId() % 10 == 0);
}
fail("should have thrown");
@@ -1909,7 +1911,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
// 3. ranges 6-10, 7-11, 8-12, 9-13 will be bad
// And so on, so 5 records in each 10 will be discarded, for 50 good records.
for (int i = 0; i < 50; i++) {
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertFalse(record.getDlsn().getEntryId() % 10 == 0);
}
@@ -1946,7 +1948,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
// 2. range 1-8 will be good, but only contain 4 records
// And so on for the next segment, so 4 records in each segment, for 12 good records
for (int i = 0; i < 12; i++) {
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertFalse(record.getDlsn().getEntryId() % 10 == 0);
}
@@ -1970,13 +1972,13 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
URI uri = createDLMURI("/" + name);
ensureURICreated(uri);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(confLocal).uri(uri).build();
// use the pool
DistributedLogManager dlm = namespace.openLog(name + "-pool");
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(1, segments.size());
long ledgerId = segments.get(0).getLogSegmentId();
@@ -1995,7 +1997,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
Optional.of(dynConf),
Optional.<StatsLogger>absent());
writer = dlm.startAsyncLogSegmentNonPartitioned();
- FutureUtils.result(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(1L)));
segments = dlm.getLogSegments();
assertEquals(1, segments.size());
ledgerId = segments.get(0).getLogSegmentId();
@@ -2023,17 +2025,17 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
DistributedLogManager dlm = createNewDLM(confLocal, name);
BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
- List<Future<DLSN>> writeFutures = Lists.newArrayList();
+ List<CompletableFuture<DLSN>> writeFutures = Lists.newArrayList();
for (int i = 0; i < 5; i++) {
LogRecord record = DLMTestUtil.getLogRecordInstance(1L + i);
writeFutures.add(writer.write(record));
}
- List<Future<DLSN>> recordSetFutures = Lists.newArrayList();
+ List<CompletableFuture<DLSN>> recordSetFutures = Lists.newArrayList();
// write another 5 records
final LogRecordSet.Writer recordSetWriter = LogRecordSet.newWriter(4096, CompressionCodec.Type.LZ4);
for (int i = 0; i < 5; i++) {
LogRecord record = DLMTestUtil.getLogRecordInstance(6L + i);
- Promise<DLSN> writePromise = new Promise<DLSN>();
+ CompletableFuture<DLSN> writePromise = new CompletableFuture<DLSN>();
recordSetWriter.writeRecord(ByteBuffer.wrap(record.getPayload()), writePromise);
recordSetFutures.add(writePromise);
}
@@ -2042,8 +2044,8 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
recordSetBuffer.get(data);
LogRecord setRecord = new LogRecord(6L, data);
setRecord.setRecordSet();
- Future<DLSN> writeRecordSetFuture = writer.write(setRecord);
- writeRecordSetFuture.addEventListener(new FutureEventListener<DLSN>() {
+ CompletableFuture<DLSN> writeRecordSetFuture = writer.write(setRecord);
+ writeRecordSetFuture.whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN dlsn) {
recordSetWriter.completeTransmit(
@@ -2058,20 +2060,20 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
}
});
writeFutures.add(writeRecordSetFuture);
- FutureUtils.result(writeRecordSetFuture);
+ Utils.ioResult(writeRecordSetFuture);
// write last 5 records
for (int i = 0; i < 5; i++) {
LogRecord record = DLMTestUtil.getLogRecordInstance(11L + i);
- Future<DLSN> writeFuture = writer.write(record);
+ CompletableFuture<DLSN> writeFuture = writer.write(record);
writeFutures.add(writeFuture);
// make sure get log record count returns the right count
if (i == 0) {
- FutureUtils.result(writeFuture);
+ Utils.ioResult(writeFuture);
assertEquals(10, dlm.getLogRecordCount());
}
}
- List<DLSN> writeResults = FutureUtils.result(Future.collect(writeFutures));
+ List<DLSN> writeResults = Utils.ioResult(FutureUtils.collect(writeFutures));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, i, 0L), writeResults.get(i));
@@ -2080,12 +2082,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, 6L + i, 0L), writeResults.get(6 + i));
}
- List<DLSN> recordSetWriteResults = Await.result(Future.collect(recordSetFutures));
+ List<DLSN> recordSetWriteResults = Utils.ioResult(FutureUtils.collect(recordSetFutures));
for (int i = 0; i < 5; i++) {
Assert.assertEquals(new DLSN(1L, 5L, i), recordSetWriteResults.get(i));
}
- FutureUtils.result(writer.flushAndCommit());
+ Utils.ioResult(writer.flushAndCommit());
DistributedLogConfiguration readConf1 = new DistributedLogConfiguration();
readConf1.addConfiguration(confLocal);
@@ -2094,7 +2096,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
DistributedLogManager readDLM1 = createNewDLM(readConf1, name);
AsyncLogReader reader1 = readDLM1.getAsyncLogReader(DLSN.InitialDLSN);
for (int i = 0; i < 15; i++) {
- LogRecordWithDLSN record = FutureUtils.result(reader1.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader1.readNext());
if (i < 5) {
assertEquals(new DLSN(1L, i, 0L), record.getDlsn());
assertEquals(1L + i, record.getTransactionId());
@@ -2118,7 +2120,7 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
DistributedLogManager readDLM2 = createNewDLM(readConf2, name);
AsyncLogReader reader2 = readDLM2.getAsyncLogReader(DLSN.InitialDLSN);
for (int i = 0; i < 11; i++) {
- LogRecordWithDLSN record = FutureUtils.result(reader2.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader2.readNext());
LOG.info("Read record {}", record);
if (i < 5) {
assertEquals(new DLSN(1L, i, 0L), record.getDlsn());
@@ -2159,12 +2161,12 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
ensureURICreated(uri);
DistributedLogManager dlm = createNewDLM(confLocal, name);
- BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+ BKAsyncLogWriter writer = (BKAsyncLogWriter) Utils.ioResult(dlm.openAsyncLogWriter());
writer.write(DLMTestUtil.getLogRecordInstance(1L));
- AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+ AsyncLogReader reader = Utils.ioResult(dlm.openAsyncLogReader(DLSN.InitialDLSN));
try {
- FutureUtils.result(reader.readNext());
+ Utils.ioResult(reader.readNext());
fail("Should fail when stream is idle");
} catch (IdleReaderException ire) {
// expected
@@ -2191,11 +2193,11 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
ensureURICreated(uri);
DistributedLogManager dlm = createNewDLM(confLocal, name);
- BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+ BKAsyncLogWriter writer = (BKAsyncLogWriter) Utils.ioResult(dlm.openAsyncLogWriter());
writer.write(DLMTestUtil.getLogRecordInstance(1L));
- AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
- LogRecordWithDLSN record = FutureUtils.result(reader.readNext());
+ AsyncLogReader reader = Utils.ioResult(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertEquals(1L, record.getTransactionId());
DLMTestUtil.verifyLogRecord(record);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index dff0133..18e097f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -23,10 +23,19 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.api.MetadataAccessor;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.exceptions.LogEmptyException;
@@ -35,7 +44,6 @@ import org.apache.distributedlog.exceptions.LogReadException;
import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
import org.apache.distributedlog.io.Abortables;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.apache.bookkeeper.client.BKException;
@@ -54,12 +62,8 @@ import org.apache.distributedlog.exceptions.TransactionIdOutOfOrderException;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.MetadataUpdater;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
@@ -89,7 +93,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
BKLogWriteHandler blplm = dlm.createWriteHandler(true);
assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
}
LogWriter writer = dlm.startLogSegmentNonPartitioned();
@@ -129,7 +133,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
BKLogWriteHandler blplm = dlm.createWriteHandler(true);
assertNotNull(zkc.exists(blplm.completedLedgerZNode(1, 100,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
}
@Test(timeout = 60000)
@@ -167,7 +171,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
assertNotNull(
zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
}
BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned();
@@ -263,14 +267,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
confLocal.setWriteLockEnabled(false);
String name = "distrlog-two-writers-lock-disabled";
DistributedLogManager manager = createNewDLM(confLocal, name);
- AsyncLogWriter writer1 = FutureUtils.result(manager.openAsyncLogWriter());
- FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(1L)));
- AsyncLogWriter writer2 = FutureUtils.result(manager.openAsyncLogWriter());
- FutureUtils.result(writer2.write(DLMTestUtil.getLogRecordInstance(2L)));
+ AsyncLogWriter writer1 = Utils.ioResult(manager.openAsyncLogWriter());
+ Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(1L)));
+ AsyncLogWriter writer2 = Utils.ioResult(manager.openAsyncLogWriter());
+ Utils.ioResult(writer2.write(DLMTestUtil.getLogRecordInstance(2L)));
// write a record to writer 1 again
try {
- FutureUtils.result(writer1.write(DLMTestUtil.getLogRecordInstance(3L)));
+ Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(3L)));
fail("Should fail writing record to writer 1 again as writer 2 took over the ownership");
} catch (BKTransmitException bkte) {
assertEquals(BKException.Code.LedgerFencedException, bkte.getBKResultCode());
@@ -311,7 +315,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
assertNotNull(
zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
}
BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
for (long j = 1; j <= DEFAULT_SEGMENT_SIZE / 2; j++) {
@@ -394,7 +398,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
assertNotNull(
zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
}
BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
@@ -411,14 +415,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
AsyncLogReader asyncreader = dlm.getAsyncLogReader(DLSN.InvalidDLSN);
long numTrans = 0;
- LogRecordWithDLSN record = Await.result(asyncreader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(asyncreader.readNext());
while (null != record) {
DLMTestUtil.verifyLogRecord(record);
numTrans++;
if (numTrans >= (txid - 1)) {
break;
}
- record = Await.result(asyncreader.readNext());
+ record = Utils.ioResult(asyncreader.readNext());
}
assertEquals((txid - 1), numTrans);
Utils.close(asyncreader);
@@ -459,12 +463,12 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
dlm.close();
URI uri = createDLMURI("/" + name);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
assertTrue(namespace.logExists(name));
assertFalse(namespace.logExists("non-existent-log"));
URI nonExistentUri = createDLMURI("/" + "non-existent-ns");
- DistributedLogNamespace nonExistentNS = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace nonExistentNS = NamespaceBuilder.newBuilder()
.conf(conf).uri(nonExistentUri).build();
assertFalse(nonExistentNS.logExists(name));
@@ -508,31 +512,31 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
SubscriptionsStore store = dlm.getSubscriptionsStore();
// no data
- assertEquals(Await.result(store.getLastCommitPosition(subscriber0)), DLSN.NonInclusiveLowerBound);
- assertEquals(Await.result(store.getLastCommitPosition(subscriber1)), DLSN.NonInclusiveLowerBound);
- assertEquals(Await.result(store.getLastCommitPosition(subscriber2)), DLSN.NonInclusiveLowerBound);
+ assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber0)), DLSN.NonInclusiveLowerBound);
+ assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber1)), DLSN.NonInclusiveLowerBound);
+ assertEquals(Utils.ioResult(store.getLastCommitPosition(subscriber2)), DLSN.NonInclusiveLowerBound);
// empty
- assertTrue(Await.result(store.getLastCommitPositions()).isEmpty());
+ assertTrue(Utils.ioResult(store.getLastCommitPositions()).isEmpty());
// subscriber 0 advance
- Await.result(store.advanceCommitPosition(subscriber0, commitPosition0));
- assertEquals(commitPosition0, Await.result(store.getLastCommitPosition(subscriber0)));
- Map<String, DLSN> committedPositions = Await.result(store.getLastCommitPositions());
+ Utils.ioResult(store.advanceCommitPosition(subscriber0, commitPosition0));
+ assertEquals(commitPosition0, Utils.ioResult(store.getLastCommitPosition(subscriber0)));
+ Map<String, DLSN> committedPositions = Utils.ioResult(store.getLastCommitPositions());
assertEquals(1, committedPositions.size());
assertEquals(commitPosition0, committedPositions.get(subscriber0));
// subscriber 1 advance
- Await.result(store.advanceCommitPosition(subscriber1, commitPosition1));
- assertEquals(commitPosition1, Await.result(store.getLastCommitPosition(subscriber1)));
- committedPositions = Await.result(store.getLastCommitPositions());
+ Utils.ioResult(store.advanceCommitPosition(subscriber1, commitPosition1));
+ assertEquals(commitPosition1, Utils.ioResult(store.getLastCommitPosition(subscriber1)));
+ committedPositions = Utils.ioResult(store.getLastCommitPositions());
assertEquals(2, committedPositions.size());
assertEquals(commitPosition0, committedPositions.get(subscriber0));
assertEquals(commitPosition1, committedPositions.get(subscriber1));
// subscriber 2 advance
- Await.result(store.advanceCommitPosition(subscriber2, commitPosition2));
- assertEquals(commitPosition2, Await.result(store.getLastCommitPosition(subscriber2)));
- committedPositions = Await.result(store.getLastCommitPositions());
+ Utils.ioResult(store.advanceCommitPosition(subscriber2, commitPosition2));
+ assertEquals(commitPosition2, Utils.ioResult(store.getLastCommitPosition(subscriber2)));
+ committedPositions = Utils.ioResult(store.getLastCommitPositions());
assertEquals(3, committedPositions.size());
assertEquals(commitPosition0, committedPositions.get(subscriber0));
assertEquals(commitPosition1, committedPositions.get(subscriber1));
@@ -541,11 +545,11 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
// subscriber 2 advance again
DistributedLogManager newDLM = createNewDLM(conf, name);
SubscriptionsStore newStore = newDLM.getSubscriptionsStore();
- Await.result(newStore.advanceCommitPosition(subscriber2, commitPosition3));
+ Utils.ioResult(newStore.advanceCommitPosition(subscriber2, commitPosition3));
newStore.close();
newDLM.close();
- committedPositions = Await.result(store.getLastCommitPositions());
+ committedPositions = Utils.ioResult(store.getLastCommitPositions());
assertEquals(3, committedPositions.size());
assertEquals(commitPosition0, committedPositions.get(subscriber0));
assertEquals(commitPosition1, committedPositions.get(subscriber1));
@@ -570,13 +574,13 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
} else {
writer.markEndOfStream();
BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, DistributedLogConstants.MAX_TXID,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
}
}
return txid;
@@ -698,8 +702,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
@Test(timeout = 60000, expected = LogRecordTooLongException.class)
public void testMaxLogRecSize() throws Exception {
DistributedLogManager dlm = createNewDLM(conf, "distrlog-maxlogRecSize");
- AsyncLogWriter writer = FutureUtils.result(dlm.openAsyncLogWriter());
- FutureUtils.result(writer.write(new LogRecord(1L, DLMTestUtil.repeatString(
+ AsyncLogWriter writer = Utils.ioResult(dlm.openAsyncLogWriter());
+ Utils.ioResult(writer.write(new LogRecord(1L, DLMTestUtil.repeatString(
DLMTestUtil.repeatString("abcdefgh", 256), 512).getBytes())));
}
@@ -710,21 +714,21 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
confLocal.setOutputBufferSize(1024 * 1024);
BKDistributedLogManager dlm =
createNewDLM(confLocal, "distrlog-transmissionSize");
- AsyncLogWriter out = FutureUtils.result(dlm.openAsyncLogWriter());
+ AsyncLogWriter out = Utils.ioResult(dlm.openAsyncLogWriter());
boolean exceptionEncountered = false;
byte[] largePayload = new byte[(LogRecord.MAX_LOGRECORDSET_SIZE / 2) + 2];
RAND.nextBytes(largePayload);
try {
LogRecord op = new LogRecord(1L, largePayload);
- Future<DLSN> firstWriteFuture = out.write(op);
+ CompletableFuture<DLSN> firstWriteFuture = out.write(op);
op = new LogRecord(2L, largePayload);
// the second write will flush the first one, since we reached the maximum transmission size.
out.write(op);
- FutureUtils.result(firstWriteFuture);
+ Utils.ioResult(firstWriteFuture);
} catch (LogRecordTooLongException exc) {
exceptionEncountered = true;
} finally {
- FutureUtils.result(out.asyncClose());
+ Utils.ioResult(out.asyncClose());
}
assertFalse(exceptionEncountered);
Abortables.abortQuietly(out);
@@ -750,7 +754,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
BKLogWriteHandler blplm = ((BKDistributedLogManager) (dlm)).createWriteHandler(true);
assertNotNull(zkc.exists(blplm.completedLedgerZNode(start, txid - 1,
perStreamLogWriter.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
}
LogReader reader = dlm.getInputStream(1);
@@ -819,7 +823,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
assertNotNull(
zkc.exists(blplm.completedLedgerZNode(txid - 1, txid - 1,
writer.getLogSegmentSequenceNumber()), false));
- FutureUtils.result(blplm.asyncClose());
+ Utils.ioResult(blplm.asyncClose());
}
BKSyncLogWriter out = (BKSyncLogWriter)dlm.startLogSegmentNonPartitioned();
@@ -857,7 +861,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(conf, name);
- FutureUtils.result(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true));
+ Utils.ioResult(dlm.getWriterMetadataStore().getLog(dlm.getUri(), name, true, true));
dlm.registerListener(new LogSegmentListener() {
@Override
public void onSegmentsUpdated(List<LogSegmentMetadata> segments) {
@@ -931,7 +935,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
for (int i = 0; i < 10; i++) {
LogRecord record = DLMTestUtil.getLogRecordInstance(txid++);
record.setControl();
- Await.result(writer.writeControlRecord(record));
+ Utils.ioResult(writer.writeControlRecord(record));
}
LOG.info("10 control records are written");
@@ -946,14 +950,14 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
LOG.info("Completed first log segment");
writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
LOG.info("Completed second log segment");
LOG.info("Writing another 10 control records");
for (int i = 1; i < 10; i++) {
LogRecord record = DLMTestUtil.getLogRecordInstance(txid++);
record.setControl();
- Await.result(writer.write(record));
+ Utils.ioResult(writer.write(record));
}
assertEquals(new DLSN(2, 0, 0), dlm.getLastDLSN());
@@ -973,8 +977,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
BKAsyncLogWriter writer = (BKAsyncLogWriter) dlm.startAsyncLogSegmentNonPartitioned();
DLMTestUtil.generateCompletedLogSegments(dlm, conf, 2, 10);
- Future<Long> futureCount = dlm.getLogRecordCountAsync(DLSN.InitialDLSN);
- Long count = Await.result(futureCount, Duration.fromSeconds(2));
+ CompletableFuture<Long> futureCount = dlm.getLogRecordCountAsync(DLSN.InitialDLSN);
+ Long count = Utils.ioResult(futureCount, 2, TimeUnit.SECONDS);
assertEquals(20, count.longValue());
writer.close();
@@ -986,7 +990,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
String baseName = testNames.getMethodName();
String streamName = "\0blah";
URI uri = createDLMURI("/" + baseName);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
DistributedLogManager dlm = null;
@@ -1036,15 +1040,15 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
for (long j = 1; j <= 10; j++) {
LogRecord record = DLMTestUtil.getLargeLogRecordInstance(txid++);
- Future<DLSN> dlsn = writer.write(record);
+ CompletableFuture<DLSN> dlsn = writer.write(record);
if (i == 1 && j == 2) {
- truncDLSN = Await.result(dlsn);
+ truncDLSN = Utils.ioResult(dlsn);
} else if (i == 2 && j == 3) {
- beyondTruncDLSN = Await.result(dlsn);
+ beyondTruncDLSN = Utils.ioResult(dlsn);
beyondTruncTxId = record.getTransactionId();
} else if (j == 10) {
- Await.ready(dlsn);
+ Utils.ioResult(dlsn);
}
}
@@ -1065,7 +1069,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(
confLocal, metadataStore);
- FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(1L)));
+ Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(1L)));
segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName()));
@@ -1088,7 +1092,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
}
updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore);
- FutureUtils.result(updater.setLogSegmentActive(segmentList.get(1L)));
+ Utils.ioResult(updater.setLogSegmentActive(segmentList.get(1L)));
segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName()));
@@ -1096,7 +1100,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
LOG.info("Read segments after marked first segment as active : {}", segmentList);
updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(confLocal, metadataStore);
- FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(2L)));
+ Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(2L)));
segmentList = DLMTestUtil.readLogSegments(zookeeperClient,
LogMetadata.getLogSegmentsPath(uri, name, confLocal.getUnpartitionedStreamName()));
@@ -1109,7 +1113,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
boolean exceptionEncountered = false;
try {
for (int i = 0; i < 3 * 10; i++) {
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
DLMTestUtil.verifyLargeLogRecord(record);
assertEquals(expectedTxId, record.getTransactionId());
expectedTxId++;
@@ -1122,10 +1126,10 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
}
updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
- FutureUtils.result(updater.setLogSegmentActive(segmentList.get(2L)));
+ Utils.ioResult(updater.setLogSegmentActive(segmentList.get(2L)));
BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
- Assert.assertTrue(Await.result(writer.truncate(truncDLSN)));
+ Assert.assertTrue(Utils.ioResult(writer.truncate(truncDLSN)));
BKLogWriteHandler handler = writer.getCachedWriteHandler();
List<LogSegmentMetadata> cachedSegments = handler.getCachedLogSegments(LogSegmentMetadata.COMPARATOR);
for (LogSegmentMetadata segment: cachedSegments) {
@@ -1164,7 +1168,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
{
AsyncLogReader reader = dlm.getAsyncLogReader(DLSN.InitialDLSN);
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertTrue(record != null);
assertEquals(truncDLSN, record.getDlsn());
Utils.close(reader);
@@ -1190,7 +1194,7 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
{
AsyncLogReader reader = dlm.getAsyncLogReader(beyondTruncDLSN);
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertTrue(record != null);
assertEquals(beyondTruncDLSN, record.getDlsn());
Utils.close(reader);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
index e0f2bab..2078a88 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKDistributedLogNamespace.java
@@ -28,14 +28,17 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Sets;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.util.DLUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -95,8 +98,8 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
DistributedLogConfiguration newConf = new DistributedLogConfiguration();
newConf.addConfiguration(conf);
newConf.setCreateStreamIfNotExists(false);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
- .conf(newConf).uri(uri).build();
+ Namespace namespace = NamespaceBuilder.newBuilder()
+ .conf(newConf).uri(uri).build();
DistributedLogManager dlm = namespace.openLog(logName);
LogWriter writer;
try {
@@ -118,7 +121,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
newConf.addConfiguration(conf);
newConf.setCreateStreamIfNotExists(false);
String streamName = "test-stream";
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(newConf).uri(uri).build();
DistributedLogManager dlm = namespace.openLog(streamName);
LogWriter writer;
@@ -148,7 +151,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
URI uri = createDLMURI("/" + runtime.getMethodName());
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
try {
@@ -225,7 +228,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
public void testNamespaceListener() throws Exception {
URI uri = createDLMURI("/" + runtime.getMethodName());
zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf).uri(uri).build();
final CountDownLatch[] latches = new CountDownLatch[3];
for (int i = 0; i < 3; i++) {
@@ -268,7 +271,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
newConf.addConfiguration(conf);
newConf.setCreateStreamIfNotExists(true);
newConf.setZkAclId(un);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(newConf).uri(uri).build();
DistributedLogManager dlm = namespace.openLog(streamName);
LogWriter writer = dlm.startLogSegmentNonPartitioned();
@@ -400,7 +403,7 @@ public class TestBKDistributedLogNamespace extends TestDistributedLogBase {
@Test(timeout = 60000)
public void testUseNamespaceAfterCloseShouldFailFast() throws Exception {
URI uri = createDLMURI("/" + runtime.getMethodName());
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf)
.uri(uri)
.build();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
index 854fd61..4915137 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestBKLogReadHandler.java
@@ -18,20 +18,21 @@
package org.apache.distributedlog;
import com.google.common.base.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogWriter;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.logsegment.LogSegmentFilter;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.Await;
import java.util.List;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
-import com.twitter.util.TimeoutException;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
@@ -78,21 +79,21 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
DistributedLogManager dlm1 = createNewDLM(confLocal, dlName);
long txid = 1;
- ArrayList<Future<DLSN>> futures = new ArrayList<Future<DLSN>>(numEntriesPerSegment);
+ ArrayList<CompletableFuture<DLSN>> futures = new ArrayList<CompletableFuture<DLSN>>(numEntriesPerSegment);
AsyncLogWriter out = dlm1.startAsyncLogSegmentNonPartitioned();
for (int eid = 0; eid < numEntriesPerSegment; ++eid) {
futures.add(out.write(DLMTestUtil.getLogRecordInstance(txid)));
++txid;
}
- FutureUtils.result(Future.collect(futures));
+ Utils.ioResult(FutureUtils.collect(futures));
// commit
LogRecord controlRecord = new LogRecord(txid, DistributedLogConstants.CONTROL_RECORD_CONTENT);
controlRecord.setControl();
- FutureUtils.result(out.write(controlRecord));
+ Utils.ioResult(out.write(controlRecord));
DLSN last = dlm1.getLastDLSN();
assertEquals(new DLSN(1,9,0), last);
- DLSN first = Await.result(dlm1.getFirstDLSNAsync());
+ DLSN first = Utils.ioResult(dlm1.getFirstDLSNAsync());
assertEquals(new DLSN(1,0,0), first);
Utils.close(out);
}
@@ -102,9 +103,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
String dlName = runtime.getMethodName();
BKDistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = dlm.createReadHandler();
- Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
+ CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
try {
- Await.result(futureRecord);
+ Utils.ioResult(futureRecord);
fail("should have thrown exception");
} catch (LogNotFoundException ex) {
}
@@ -116,9 +117,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
BKDistributedLogManager dlm = createNewDLM(conf, dlName);
DLMTestUtil.generateCompletedLogSegments(dlm, conf, 3, 3);
BKLogReadHandler readHandler = dlm.createReadHandler();
- Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
+ CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
try {
- LogRecordWithDLSN record = Await.result(futureRecord);
+ LogRecordWithDLSN record = Utils.ioResult(futureRecord);
assertEquals(new DLSN(1, 0, 0), record.getDlsn());
} catch (Exception ex) {
fail("should not have thrown exception: " + ex);
@@ -133,11 +134,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
BKLogReadHandler readHandler =
((BKDistributedLogManager) dlm).createReadHandler();
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
- Future<Boolean> futureSuccess = writer.truncate(new DLSN(2, 0, 0));
- Boolean success = Await.result(futureSuccess);
+ CompletableFuture<Boolean> futureSuccess = writer.truncate(new DLSN(2, 0, 0));
+ Boolean success = Utils.ioResult(futureSuccess);
assertTrue(success);
- Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
- LogRecordWithDLSN record = Await.result(futureRecord);
+ CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
+ LogRecordWithDLSN record = Utils.ioResult(futureRecord);
assertEquals(new DLSN(2, 0, 0), record.getDlsn());
}
@@ -151,11 +152,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
// Only truncates at ledger boundary.
- Future<Boolean> futureSuccess = writer.truncate(new DLSN(2, 5, 0));
- Boolean success = Await.result(futureSuccess);
+ CompletableFuture<Boolean> futureSuccess = writer.truncate(new DLSN(2, 5, 0));
+ Boolean success = Utils.ioResult(futureSuccess);
assertTrue(success);
- Future<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
- LogRecordWithDLSN record = Await.result(futureRecord);
+ CompletableFuture<LogRecordWithDLSN> futureRecord = readHandler.asyncGetFirstLogRecord();
+ LogRecordWithDLSN record = Utils.ioResult(futureRecord);
assertEquals(new DLSN(2, 0, 0), record.getDlsn());
}
@@ -164,10 +165,10 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
String dlName = runtime.getMethodName();
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(DLSN.InitialDLSN);
try {
- Await.result(count);
+ Utils.ioResult(count);
fail("log is empty, should have returned log empty ex");
} catch (LogNotFoundException ex) {
}
@@ -179,9 +180,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
prepareLogSegmentsNonPartitioned(dlName, 11, 3);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(DLSN.InitialDLSN);
- assertEquals(33, Await.result(count).longValue());
+ assertEquals(33, Utils.ioResult(count).longValue());
}
@Test(timeout = 60000)
@@ -190,11 +191,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
prepareLogSegmentsNonPartitioned(dlName, 11, 3);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(2, 0, 0));
- assertEquals(30, Await.result(count).longValue());
+ assertEquals(30, Utils.ioResult(count).longValue());
count = readHandler.asyncGetLogRecordCount(new DLSN(3, 0, 0));
- assertEquals(27, Await.result(count).longValue());
+ assertEquals(27, Utils.ioResult(count).longValue());
}
@Test(timeout = 60000)
@@ -203,9 +204,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
prepareLogSegmentsNonPartitioned(dlName, 11, 3);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(12, 0, 0));
- assertEquals(0, Await.result(count).longValue());
+ assertEquals(0, Utils.ioResult(count).longValue());
}
@Test(timeout = 60000)
@@ -214,9 +215,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
prepareLogSegmentsNonPartitioned(dlName, 11, 3);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(11, 2, 0));
- assertEquals(1, Await.result(count).longValue());
+ assertEquals(1, Utils.ioResult(count).longValue());
}
@Test(timeout = 60000)
@@ -225,11 +226,11 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
prepareLogSegmentsNonPartitioned(dlName, 5, 10);
DistributedLogManager dlm = createNewDLM(conf, dlName);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(3, 5, 0));
- assertEquals(25, Await.result(count).longValue());
+ assertEquals(25, Utils.ioResult(count).longValue());
count = readHandler.asyncGetLogRecordCount(new DLSN(2, 5, 0));
- assertEquals(35, Await.result(count).longValue());
+ assertEquals(35, Utils.ioResult(count).longValue());
}
@Test(timeout = 60000)
@@ -239,9 +240,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 5, 5, txid);
txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 0, 10, txid);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
- assertEquals(15, Await.result(count).longValue());
+ assertEquals(15, Utils.ioResult(count).longValue());
}
@Test(timeout = 60000)
@@ -251,9 +252,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 5, 0, txid);
txid += DLMTestUtil.generateLogSegmentNonPartitioned(dlm, 10, 0, txid);
BKLogReadHandler readHandler = ((BKDistributedLogManager) dlm).createReadHandler();
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
- assertEquals(0, Await.result(count).longValue());
+ assertEquals(0, Utils.ioResult(count).longValue());
}
@Test(timeout = 60000)
@@ -264,12 +265,12 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned();
int txid = 1;
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
BKLogReadHandler readHandler = bkdlm.createReadHandler();
- List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+ List<LogSegmentMetadata> ledgerList = Utils.ioResult(
readHandler.readLogSegmentsFromStore(
LogSegmentMetadata.COMPARATOR,
LogSegmentFilter.DEFAULT_FILTER,
@@ -279,9 +280,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
assertEquals(1, ledgerList.size());
assertTrue(ledgerList.get(0).isInProgress());
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
- assertEquals(2, Await.result(count).longValue());
+ assertEquals(2, Utils.ioResult(count).longValue());
Utils.close(out);
}
@@ -294,12 +295,12 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
long txid = 1;
txid += DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, txid);
AsyncLogWriter out = bkdlm.startAsyncLogSegmentNonPartitioned();
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
- Await.result(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
+ Utils.ioResult(out.write(DLMTestUtil.getLargeLogRecordInstance(txid++, false)));
BKLogReadHandler readHandler = bkdlm.createReadHandler();
- List<LogSegmentMetadata> ledgerList = FutureUtils.result(
+ List<LogSegmentMetadata> ledgerList = Utils.ioResult(
readHandler.readLogSegmentsFromStore(
LogSegmentMetadata.COMPARATOR,
LogSegmentFilter.DEFAULT_FILTER,
@@ -309,9 +310,9 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
assertFalse(ledgerList.get(0).isInProgress());
assertTrue(ledgerList.get(1).isInProgress());
- Future<Long> count = null;
+ CompletableFuture<Long> count = null;
count = readHandler.asyncGetLogRecordCount(new DLSN(1, 0, 0));
- assertEquals(7, Await.result(count).longValue());
+ assertEquals(7, Utils.ioResult(count).longValue());
Utils.close(out);
}
@@ -322,14 +323,14 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
BKDistributedLogManager bkdlm = (BKDistributedLogManager) createNewDLM(conf, streamName);
BKLogReadHandler readHandler = bkdlm.createReadHandler();
try {
- Await.result(readHandler.lockStream());
+ Utils.ioResult(readHandler.lockStream());
fail("Should fail lock stream if log not found");
} catch (LogNotFoundException ex) {
}
BKLogReadHandler subscriberReadHandler = bkdlm.createReadHandler(Optional.of("test-subscriber"));
try {
- Await.result(subscriberReadHandler.lockStream());
+ Utils.ioResult(subscriberReadHandler.lockStream());
fail("Subscriber should fail lock stream if log not found");
} catch (LogNotFoundException ex) {
// expected
@@ -342,17 +343,17 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
BKDistributedLogManager bkdlm = createNewDLM(conf, streamName);
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1);
BKLogReadHandler readHandler = bkdlm.createReadHandler();
- Await.result(readHandler.lockStream());
+ Utils.ioResult(readHandler.lockStream());
// two subscribers could lock stream in parallel
BKDistributedLogManager bkdlm10 = createNewDLM(conf, streamName);
BKLogReadHandler s10Handler =
bkdlm10.createReadHandler(Optional.of("s1"));
- Await.result(s10Handler.lockStream());
+ Utils.ioResult(s10Handler.lockStream());
BKDistributedLogManager bkdlm20 = createNewDLM(conf, streamName);
BKLogReadHandler s20Handler =
bkdlm20.createReadHandler(Optional.of("s2"));
- Await.result(s20Handler.lockStream());
+ Utils.ioResult(s20Handler.lockStream());
readHandler.asyncClose();
bkdlm.close();
@@ -368,19 +369,19 @@ public class TestBKLogReadHandler extends TestDistributedLogBase {
BKDistributedLogManager bkdlm = createNewDLM(conf, streamName);
DLMTestUtil.generateLogSegmentNonPartitioned(bkdlm, 0, 5, 1);
BKLogReadHandler readHandler = bkdlm.createReadHandler();
- Await.result(readHandler.lockStream());
+ Utils.ioResult(readHandler.lockStream());
// same subscrbiers couldn't lock stream in parallel
BKDistributedLogManager bkdlm10 = createNewDLM(conf, streamName);
BKLogReadHandler s10Handler =
bkdlm10.createReadHandler(Optional.of("s1"));
- Await.result(s10Handler.lockStream());
+ Utils.ioResult(s10Handler.lockStream());
BKDistributedLogManager bkdlm11 = createNewDLM(conf, streamName);
BKLogReadHandler s11Handler =
bkdlm11.createReadHandler(Optional.of("s1"));
try {
- Await.result(s11Handler.lockStream(), Duration.apply(10000, TimeUnit.MILLISECONDS));
+ Utils.ioResult(s11Handler.lockStream(), 10000, TimeUnit.MILLISECONDS);
fail("Should fail lock stream using same subscriber id");
} catch (OwnershipAcquireFailedException oafe) {
// expected