You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:48 UTC
[16/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
index fdb29f3..1293d00 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java
@@ -17,9 +17,25 @@
*/
package org.apache.distributedlog;
+import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.stats.AlertStatsLogger;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.versioning.Version;
+import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.DLIllegalStateException;
import org.apache.distributedlog.exceptions.EndOfStreamException;
@@ -43,34 +59,14 @@ import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.util.Allocator;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.FutureUtils.FutureEventListenerRunnable;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Transaction;
-import org.apache.distributedlog.util.PermitLimiter;
+import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.apache.bookkeeper.feature.FeatureProvider;
-import org.apache.bookkeeper.stats.AlertStatsLogger;
-import org.apache.bookkeeper.stats.OpStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.distributedlog.impl.ZKLogSegmentFilters.WRITE_HANDLE_FILTER;
/**
* Log Handler for Writers.
@@ -108,7 +104,7 @@ class BKLogWriteHandler extends BKLogHandler {
protected final boolean validateLogSegmentSequenceNumber;
protected final int regionId;
protected final RollingPolicy rollingPolicy;
- protected Future<? extends DistributedLock> lockFuture = null;
+ protected CompletableFuture<? extends DistributedLock> lockFuture = null;
protected final PermitLimiter writeLimiter;
protected final FeatureProvider featureProvider;
protected final DynamicDistributedLogConfiguration dynConf;
@@ -117,16 +113,16 @@ class BKLogWriteHandler extends BKLogHandler {
protected final LinkedList<Long> inprogressLSSNs;
// Fetch LogSegments State: write can continue without full list of log segments while truncation needs
- private final Future<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
- private Future<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;
+ private final CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForWrite;
+ private CompletableFuture<Versioned<List<LogSegmentMetadata>>> fetchForTruncation;
// Recover Functions
private final RecoverLogSegmentFunction recoverLogSegmentFunction =
new RecoverLogSegmentFunction();
- private final AbstractFunction1<List<LogSegmentMetadata>, Future<Long>> recoverLogSegmentsFunction =
- new AbstractFunction1<List<LogSegmentMetadata>, Future<Long>>() {
+ private final Function<List<LogSegmentMetadata>, CompletableFuture<Long>> recoverLogSegmentsFunction =
+ new Function<List<LogSegmentMetadata>, CompletableFuture<Long>>() {
@Override
- public Future<Long> apply(List<LogSegmentMetadata> segmentList) {
+ public CompletableFuture<Long> apply(List<LogSegmentMetadata> segmentList) {
LOG.info("Initiating Recovery For {} : {}", getFullyQualifiedName(), segmentList);
// if lastLedgerRollingTimeMillis is not updated, we set it to now.
synchronized (BKLogWriteHandler.this) {
@@ -145,8 +141,11 @@ class BKLogWriteHandler extends BKLogHandler {
}
}
- return FutureUtils.processList(segmentList, recoverLogSegmentFunction, scheduler).map(
- GetLastTxIdFunction.INSTANCE);
+ return FutureUtils.processList(
+ segmentList,
+ recoverLogSegmentFunction,
+ scheduler
+ ).thenApply(GetLastTxIdFunction.INSTANCE);
}
};
@@ -232,30 +231,30 @@ class BKLogWriteHandler extends BKLogHandler {
deleteOpStats = segmentsStatsLogger.getOpStatsLogger("delete");
}
- private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch(
+ private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFetch(
final Comparator<LogSegmentMetadata> comparator) {
- final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
- fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+ final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>();
+ fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
public void onFailure(Throwable cause) {
- FutureUtils.setException(promise, cause);
+ promise.completeExceptionally(cause);
}
@Override
public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
try {
- FutureUtils.setValue(promise, getCachedLogSegments(comparator));
+ promise.complete(getCachedLogSegments(comparator));
} catch (UnexpectedException e) {
- FutureUtils.setException(promise, e);
+ promise.completeExceptionally(e);
}
}
});
return promise;
}
- private Future<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch(
+ private CompletableFuture<List<LogSegmentMetadata>> getCachedLogSegmentsAfterFirstFullFetch(
final Comparator<LogSegmentMetadata> comparator) {
- Future<Versioned<List<LogSegmentMetadata>>> result;
+ CompletableFuture<Versioned<List<LogSegmentMetadata>>> result;
synchronized (this) {
if (null == fetchForTruncation) {
fetchForTruncation = readLogSegmentsFromStore(
@@ -266,19 +265,19 @@ class BKLogWriteHandler extends BKLogHandler {
result = fetchForTruncation;
}
- final Promise<List<LogSegmentMetadata>> promise = new Promise<List<LogSegmentMetadata>>();
- result.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+ final CompletableFuture<List<LogSegmentMetadata>> promise = new CompletableFuture<List<LogSegmentMetadata>>();
+ result.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
public void onFailure(Throwable cause) {
- FutureUtils.setException(promise, cause);
+ FutureUtils.completeExceptionally(promise, cause);
}
@Override
public void onSuccess(Versioned<List<LogSegmentMetadata>> result) {
try {
- FutureUtils.setValue(promise, getCachedLogSegments(comparator));
+ FutureUtils.complete(promise, getCachedLogSegments(comparator));
} catch (UnexpectedException e) {
- FutureUtils.setException(promise, e);
+ FutureUtils.completeExceptionally(promise, e);
}
}
});
@@ -374,7 +373,7 @@ class BKLogWriteHandler extends BKLogHandler {
*
* @return future represents the lock result
*/
- Future<? extends DistributedLock> lockHandler() {
+ CompletableFuture<? extends DistributedLock> lockHandler() {
if (null != lockFuture) {
return lockFuture;
}
@@ -382,11 +381,11 @@ class BKLogWriteHandler extends BKLogHandler {
return lockFuture;
}
- Future<Void> unlockHandler() {
+ CompletableFuture<Void> unlockHandler() {
if (null != lockFuture) {
return lock.asyncClose();
} else {
- return Future.Void();
+ return FutureUtils.Void();
}
}
@@ -483,23 +482,23 @@ class BKLogWriteHandler extends BKLogHandler {
}
protected BKLogSegmentWriter doStartLogSegment(long txId, boolean bestEffort, boolean allowMaxTxID) throws IOException {
- return FutureUtils.result(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
+ return Utils.ioResult(asyncStartLogSegment(txId, bestEffort, allowMaxTxID));
}
- protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
+ protected CompletableFuture<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
final boolean bestEffort,
final boolean allowMaxTxID) {
- final Promise<BKLogSegmentWriter> promise = new Promise<BKLogSegmentWriter>();
+ final CompletableFuture<BKLogSegmentWriter> promise = new CompletableFuture<BKLogSegmentWriter>();
try {
lock.checkOwnershipAndReacquire();
} catch (LockingException e) {
- FutureUtils.setException(promise, e);
+ FutureUtils.completeExceptionally(promise, e);
return promise;
}
- fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+ fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
public void onFailure(Throwable cause) {
- FutureUtils.setException(promise, cause);
+ FutureUtils.completeExceptionally(promise, cause);
}
@Override
@@ -513,11 +512,11 @@ class BKLogWriteHandler extends BKLogHandler {
protected void doStartLogSegment(final long txId,
final boolean bestEffort,
final boolean allowMaxTxID,
- final Promise<BKLogSegmentWriter> promise) {
+ final CompletableFuture<BKLogSegmentWriter> promise) {
// validate the tx id
if ((txId < 0) ||
(!allowMaxTxID && (txId == DistributedLogConstants.MAX_TXID))) {
- FutureUtils.setException(promise, new IOException("Invalid Transaction Id " + txId));
+ FutureUtils.completeExceptionally(promise, new IOException("Invalid Transaction Id " + txId));
return;
}
@@ -525,11 +524,11 @@ class BKLogWriteHandler extends BKLogHandler {
if (txId < highestTxIdWritten) {
if (highestTxIdWritten == DistributedLogConstants.MAX_TXID) {
LOG.error("We've already marked the stream as ended and attempting to start a new log segment");
- FutureUtils.setException(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
+ FutureUtils.completeExceptionally(promise, new EndOfStreamException("Writing to a stream after it has been marked as completed"));
return;
} else {
LOG.error("We've already seen TxId {} the max TXId is {}", txId, highestTxIdWritten);
- FutureUtils.setException(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
+ FutureUtils.completeExceptionally(promise, new TransactionIdOutOfOrderException(txId, highestTxIdWritten));
return;
}
}
@@ -554,7 +553,7 @@ class BKLogWriteHandler extends BKLogHandler {
}
logSegmentAllocator.tryObtain(txn, NULL_OP_LISTENER)
- .addEventListener(new FutureEventListener<LogSegmentEntryWriter>() {
+ .whenComplete(new FutureEventListener<LogSegmentEntryWriter>() {
@Override
public void onSuccess(LogSegmentEntryWriter entryWriter) {
@@ -574,13 +573,13 @@ class BKLogWriteHandler extends BKLogHandler {
});
}
- private void failStartLogSegment(Promise<BKLogSegmentWriter> promise,
+ private void failStartLogSegment(CompletableFuture<BKLogSegmentWriter> promise,
boolean bestEffort,
Throwable cause) {
if (bestEffort) {
- FutureUtils.setValue(promise, null);
+ FutureUtils.complete(promise, null);
} else {
- FutureUtils.setException(promise, cause);
+ FutureUtils.completeExceptionally(promise, cause);
}
}
@@ -591,7 +590,7 @@ class BKLogWriteHandler extends BKLogHandler {
final long txId,
final LogSegmentEntryWriter entryWriter,
boolean bestEffort,
- final Promise<BKLogSegmentWriter> promise) {
+ final CompletableFuture<BKLogSegmentWriter> promise) {
final long logSegmentSeqNo;
try {
FailpointUtils.checkFailPoint(
@@ -626,12 +625,12 @@ class BKLogWriteHandler extends BKLogHandler {
LOG.debug("Try storing MaxTxId in startLogSegment {} {}", inprogressZnodePath, txId);
storeMaxTxId(txn, maxTxId, txId);
- txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() {
+ txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
try {
- FutureUtils.setValue(promise, new BKLogSegmentWriter(
+ FutureUtils.complete(promise, new BKLogSegmentWriter(
getFullyQualifiedName(),
l.getSegmentName(),
conf,
@@ -656,7 +655,7 @@ class BKLogWriteHandler extends BKLogHandler {
public void onFailure(Throwable cause) {
failStartLogSegment(promise, false, cause);
}
- }, scheduler));
+ }, scheduler);
}
boolean shouldStartNewSegment(BKLogSegmentWriter writer) {
@@ -672,21 +671,21 @@ class BKLogWriteHandler extends BKLogHandler {
* the firstTxId of the ledger matches firstTxId for the segment we are
* trying to finalize.
*/
- Future<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) {
- final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+ CompletableFuture<LogSegmentMetadata> completeAndCloseLogSegment(final BKLogSegmentWriter writer) {
+ final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
completeAndCloseLogSegment(writer, promise);
return promise;
}
private void completeAndCloseLogSegment(final BKLogSegmentWriter writer,
- final Promise<LogSegmentMetadata> promise) {
- writer.asyncClose().addEventListener(new FutureEventListener<Void>() {
+ final CompletableFuture<LogSegmentMetadata> promise) {
+ writer.asyncClose().whenComplete(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
// in theory closeToFinalize should throw exception if a stream is in error.
// just in case, add another checking here to make sure we don't close log segment is a stream is in error.
if (writer.shouldFailCompleteLogSegment()) {
- FutureUtils.setException(promise,
+ FutureUtils.completeExceptionally(promise,
new IOException("LogSegmentWriter for " + writer.getFullyQualifiedLogSegment() + " is already in error."));
return;
}
@@ -704,7 +703,7 @@ class BKLogWriteHandler extends BKLogHandler {
@Override
public void onFailure(Throwable cause) {
- FutureUtils.setException(promise, cause);
+ FutureUtils.completeExceptionally(promise, cause);
}
});
}
@@ -791,7 +790,7 @@ class BKLogWriteHandler extends BKLogHandler {
int recordCount,
long lastEntryId,
long lastSlotId) throws IOException {
- Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+ CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
doCompleteAndCloseLogSegment(
inprogressZnodeName,
logSegmentSeqNo,
@@ -802,7 +801,7 @@ class BKLogWriteHandler extends BKLogHandler {
lastEntryId,
lastSlotId,
promise);
- return FutureUtils.result(promise);
+ return Utils.ioResult(promise);
}
protected void doCompleteAndCloseLogSegment(final String inprogressZnodeName,
@@ -813,11 +812,11 @@ class BKLogWriteHandler extends BKLogHandler {
final int recordCount,
final long lastEntryId,
final long lastSlotId,
- final Promise<LogSegmentMetadata> promise) {
- fetchForWrite.addEventListener(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
+ final CompletableFuture<LogSegmentMetadata> promise) {
+ fetchForWrite.whenComplete(new FutureEventListener<Versioned<List<LogSegmentMetadata>>>() {
@Override
public void onFailure(Throwable cause) {
- FutureUtils.setException(promise, cause);
+ FutureUtils.completeExceptionally(promise, cause);
}
@Override
@@ -845,11 +844,11 @@ class BKLogWriteHandler extends BKLogHandler {
int recordCount,
long lastEntryId,
long lastSlotId,
- final Promise<LogSegmentMetadata> promise) {
+ final CompletableFuture<LogSegmentMetadata> promise) {
try {
lock.checkOwnershipAndReacquire();
} catch (IOException ioe) {
- FutureUtils.setException(promise, ioe);
+ FutureUtils.completeExceptionally(promise, ioe);
return;
}
@@ -858,7 +857,7 @@ class BKLogWriteHandler extends BKLogHandler {
// validate log segment
if (inprogressLogSegment.getLogSegmentId() != logSegmentId) {
- FutureUtils.setException(promise, new IOException(
+ FutureUtils.completeExceptionally(promise, new IOException(
"Active ledger has different ID to inprogress. "
+ inprogressLogSegment.getLogSegmentId() + " found, "
+ logSegmentId + " expected"));
@@ -866,7 +865,7 @@ class BKLogWriteHandler extends BKLogHandler {
}
// validate the transaction id
if (inprogressLogSegment.getFirstTxId() != firstTxId) {
- FutureUtils.setException(promise, new IOException("Transaction id not as expected, "
+ FutureUtils.completeExceptionally(promise, new IOException("Transaction id not as expected, "
+ inprogressLogSegment.getFirstTxId() + " found, " + firstTxId + " expected"));
return;
}
@@ -874,7 +873,7 @@ class BKLogWriteHandler extends BKLogHandler {
if (validateLogSegmentSequenceNumber) {
synchronized (inprogressLSSNs) {
if (inprogressLSSNs.isEmpty()) {
- FutureUtils.setException(promise, new UnexpectedException(
+ FutureUtils.completeExceptionally(promise, new UnexpectedException(
"Didn't find matched inprogress log segments when completing inprogress "
+ inprogressLogSegment));
return;
@@ -886,7 +885,7 @@ class BKLogWriteHandler extends BKLogHandler {
// it should also be same as the least inprogress log segment sequence number tracked in {@link inprogressLSSNs}
if ((inprogressLogSegment.getLogSegmentSequenceNumber() != logSegmentSeqNo) ||
(leastInprogressLSSN != logSegmentSeqNo)) {
- FutureUtils.setException(promise, new UnexpectedException(
+ FutureUtils.completeExceptionally(promise, new UnexpectedException(
"Didn't find matched inprogress log segments when completing inprogress "
+ inprogressLogSegment));
return;
@@ -906,7 +905,7 @@ class BKLogWriteHandler extends BKLogHandler {
LOG.warn("Unexpected max ledger sequence number {} found while completing log segment {} for {}",
new Object[] { maxLogSegmentSequenceNo.getSequenceNumber(), logSegmentSeqNo, getFullyQualifiedName() });
if (validateLogSegmentSequenceNumber) {
- FutureUtils.setException(promise, new DLIllegalStateException("Unexpected max log segment sequence number "
+ FutureUtils.completeExceptionally(promise, new DLIllegalStateException("Unexpected max log segment sequence number "
+ maxLogSegmentSequenceNo.getSequenceNumber() + " for " + getFullyQualifiedName()
+ ", expected " + (logSegmentSeqNo - 1)));
return;
@@ -919,7 +918,7 @@ class BKLogWriteHandler extends BKLogHandler {
try {
startSequenceId = computeStartSequenceId(inprogressLogSegment);
} catch (IOException ioe) {
- FutureUtils.setException(promise, ioe);
+ FutureUtils.completeExceptionally(promise, ioe);
return;
}
// write completed ledger znode
@@ -946,50 +945,45 @@ class BKLogWriteHandler extends BKLogHandler {
LOG.debug("Trying storing LastTxId in Finalize Path {} LastTxId {}", pathForCompletedLedger, lastTxId);
storeMaxTxId(txn, maxTxId, lastTxId);
- txn.execute().addEventListener(FutureEventListenerRunnable.of(new FutureEventListener<Void>() {
+ txn.execute().whenCompleteAsync(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
LOG.info("Completed {} to {} for {} : {}",
new Object[] { inprogressZnodeName, completedLogSegment.getSegmentName(),
getFullyQualifiedName(), completedLogSegment });
- FutureUtils.setValue(promise, completedLogSegment);
+ FutureUtils.complete(promise, completedLogSegment);
}
@Override
public void onFailure(Throwable cause) {
- FutureUtils.setException(promise, cause);
+ FutureUtils.completeExceptionally(promise, cause);
}
- }, scheduler));
+ }, scheduler);
}
- public Future<Long> recoverIncompleteLogSegments() {
+ public CompletableFuture<Long> recoverIncompleteLogSegments() {
try {
FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_RecoverIncompleteLogSegments);
} catch (IOException ioe) {
- return Future.exception(ioe);
+ return FutureUtils.exception(ioe);
}
- return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).flatMap(recoverLogSegmentsFunction);
+ return getCachedLogSegmentsAfterFirstFetch(LogSegmentMetadata.COMPARATOR).thenCompose(recoverLogSegmentsFunction);
}
- class RecoverLogSegmentFunction extends Function<LogSegmentMetadata, Future<LogSegmentMetadata>> {
+ class RecoverLogSegmentFunction implements Function<LogSegmentMetadata, CompletableFuture<LogSegmentMetadata>> {
@Override
- public Future<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
+ public CompletableFuture<LogSegmentMetadata> apply(final LogSegmentMetadata l) {
if (!l.isInProgress()) {
- return Future.value(l);
+ return FutureUtils.value(l);
}
LOG.info("Recovering last record in log segment {} for {}.", l, getFullyQualifiedName());
- return asyncReadLastRecord(l, true, true, true).flatMap(
- new AbstractFunction1<LogRecordWithDLSN, Future<LogSegmentMetadata>>() {
- @Override
- public Future<LogSegmentMetadata> apply(LogRecordWithDLSN lastRecord) {
- return completeLogSegment(l, lastRecord);
- }
- });
+ return asyncReadLastRecord(l, true, true, true).thenCompose(
+ lastRecord -> completeLogSegment(l, lastRecord));
}
- private Future<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l,
+ private CompletableFuture<LogSegmentMetadata> completeLogSegment(LogSegmentMetadata l,
LogRecordWithDLSN lastRecord) {
LOG.info("Recovered last record in log segment {} for {}.", l, getFullyQualifiedName());
@@ -1009,14 +1003,14 @@ class BKLogWriteHandler extends BKLogHandler {
LOG.error("Unrecoverable corruption has occurred in segment "
+ l.toString() + " at path " + l.getZkPath()
+ ". Unable to continue recovery.");
- return Future.exception(new IOException("Unrecoverable corruption,"
+ return FutureUtils.exception(new IOException("Unrecoverable corruption,"
+ " please check logs."));
} else if (endTxId == DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID) {
// TODO: Empty ledger - Ideally we should just remove it?
endTxId = l.getFirstTxId();
}
- Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+ CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
doCompleteAndCloseLogSegment(
l.getZNodeName(),
l.getLogSegmentSequenceNumber(),
@@ -1032,21 +1026,16 @@ class BKLogWriteHandler extends BKLogHandler {
}
- Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
+ CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(final DLSN dlsn) {
if (DLSN.InvalidDLSN == dlsn) {
List<LogSegmentMetadata> emptyList = new ArrayList<LogSegmentMetadata>(0);
- return Future.value(emptyList);
+ return FutureUtils.value(emptyList);
}
- return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
- new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
- @Override
- public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
- return setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn);
- }
- });
+ return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
+ logSegments -> setLogSegmentsOlderThanDLSNTruncated(logSegments, dlsn));
}
- private Future<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments,
+ private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentsOlderThanDLSNTruncated(List<LogSegmentMetadata> logSegments,
final DLSN dlsn) {
LOG.debug("Setting truncation status on logs older than {} from {} for {}",
new Object[]{dlsn, logSegments, getFullyQualifiedName()});
@@ -1064,7 +1053,7 @@ class BKLogWriteHandler extends BKLogHandler {
if (null != partialTruncate) {
String logMsg = String.format("Potential metadata inconsistency for stream %s at segment %s", getFullyQualifiedName(), l);
LOG.error(logMsg);
- return Future.exception(new DLIllegalStateException(logMsg));
+ return FutureUtils.exception(new DLIllegalStateException(logMsg));
}
LOG.info("{}: Partially truncating log segment {} older than {}.", new Object[] {getFullyQualifiedName(), l, dlsn});
partialTruncate = l;
@@ -1096,15 +1085,15 @@ class BKLogWriteHandler extends BKLogHandler {
}
}
- Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
+ CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTimestamp(final long minTimestampToKeep) {
if (minTimestampToKeep >= Utils.nowInMillis()) {
- return Future.exception(new IllegalArgumentException(
+ return FutureUtils.exception(new IllegalArgumentException(
"Invalid timestamp " + minTimestampToKeep + " to purge logs for " + getFullyQualifiedName()));
}
- return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
- new Function<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
+ return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
+ new Function<List<LogSegmentMetadata>, CompletableFuture<List<LogSegmentMetadata>>>() {
@Override
- public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
+ public CompletableFuture<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
List<LogSegmentMetadata> purgeList = new ArrayList<LogSegmentMetadata>(logSegments.size());
int numCandidates = getNumCandidateLogSegmentsToPurge(logSegments);
@@ -1129,38 +1118,35 @@ class BKLogWriteHandler extends BKLogHandler {
});
}
- Future<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
- return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).flatMap(
- new AbstractFunction1<List<LogSegmentMetadata>, Future<List<LogSegmentMetadata>>>() {
- @Override
- public Future<List<LogSegmentMetadata>> apply(List<LogSegmentMetadata> logSegments) {
- int numLogSegmentsToProcess;
+ CompletableFuture<List<LogSegmentMetadata>> purgeLogSegmentsOlderThanTxnId(final long minTxIdToKeep) {
+ return getCachedLogSegmentsAfterFirstFullFetch(LogSegmentMetadata.COMPARATOR).thenCompose(
+ logSegments -> {
+ int numLogSegmentsToProcess;
- if (minTxIdToKeep < 0) {
- // we are deleting the log, we can remove whole log segments
- numLogSegmentsToProcess = logSegments.size();
+ if (minTxIdToKeep < 0) {
+ // we are deleting the log, we can remove whole log segments
+ numLogSegmentsToProcess = logSegments.size();
+ } else {
+ numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments);
+ }
+ List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess);
+ for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) {
+ LogSegmentMetadata l = logSegments.get(iterator);
+ if ((minTxIdToKeep < 0) ||
+ ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
+ !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) {
+ purgeList.add(l);
} else {
- numLogSegmentsToProcess = getNumCandidateLogSegmentsToPurge(logSegments);
- }
- List<LogSegmentMetadata> purgeList = Lists.newArrayListWithExpectedSize(numLogSegmentsToProcess);
- for (int iterator = 0; iterator < numLogSegmentsToProcess; iterator++) {
- LogSegmentMetadata l = logSegments.get(iterator);
- if ((minTxIdToKeep < 0) ||
- ((l.isTruncated() || !conf.getExplicitTruncationByApplication()) &&
- !l.isInProgress() && (l.getLastTxId() < minTxIdToKeep))) {
- purgeList.add(l);
- } else {
- // stop truncating log segments if we find either an inprogress or a partially
- // truncated log segment
- break;
- }
+ // stop truncating log segments if we find either an inprogress or a partially
+ // truncated log segment
+ break;
}
- return deleteLogSegments(purgeList);
}
+ return deleteLogSegments(purgeList);
});
}
- private Future<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
+ private CompletableFuture<List<LogSegmentMetadata>> setLogSegmentTruncationStatus(
final List<LogSegmentMetadata> truncateList,
LogSegmentMetadata partialTruncate,
DLSN minActiveDLSN) {
@@ -1183,39 +1169,31 @@ class BKLogWriteHandler extends BKLogHandler {
listAfterTruncated.add(newSegment);
}
- return updateTxn.execute().map(new AbstractFunction1<Void, List<LogSegmentMetadata>>() {
- @Override
- public List<LogSegmentMetadata> apply(Void value) {
- for (int i = 0; i < listToTruncate.size(); i++) {
- removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName());
- LogSegmentMetadata newSegment = listAfterTruncated.get(i);
- addLogSegmentToCache(newSegment.getSegmentName(), newSegment);
- }
- return listAfterTruncated;
+ return updateTxn.execute().thenApply(value -> {
+ for (int i = 0; i < listToTruncate.size(); i++) {
+ removeLogSegmentFromCache(listToTruncate.get(i).getSegmentName());
+ LogSegmentMetadata newSegment = listAfterTruncated.get(i);
+ addLogSegmentToCache(newSegment.getSegmentName(), newSegment);
}
+ return listAfterTruncated;
});
}
- private Future<List<LogSegmentMetadata>> deleteLogSegments(
+ private CompletableFuture<List<LogSegmentMetadata>> deleteLogSegments(
final List<LogSegmentMetadata> logs) {
if (LOG.isTraceEnabled()) {
LOG.trace("Purging logs for {} : {}", getFullyQualifiedName(), logs);
}
return FutureUtils.processList(logs,
- new Function<LogSegmentMetadata, Future<LogSegmentMetadata>>() {
- @Override
- public Future<LogSegmentMetadata> apply(LogSegmentMetadata segment) {
- return deleteLogSegment(segment);
- }
- }, scheduler);
+ segment -> deleteLogSegment(segment), scheduler);
}
- private Future<LogSegmentMetadata> deleteLogSegment(
+ private CompletableFuture<LogSegmentMetadata> deleteLogSegment(
final LogSegmentMetadata ledgerMetadata) {
LOG.info("Deleting ledger {} for {}", ledgerMetadata, getFullyQualifiedName());
- final Promise<LogSegmentMetadata> promise = new Promise<LogSegmentMetadata>();
+ final CompletableFuture<LogSegmentMetadata> promise = new CompletableFuture<LogSegmentMetadata>();
final Stopwatch stopwatch = Stopwatch.createStarted();
- promise.addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+ promise.whenComplete(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onSuccess(LogSegmentMetadata segment) {
deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS));
@@ -1227,10 +1205,10 @@ class BKLogWriteHandler extends BKLogHandler {
}
});
entryStore.deleteLogSegment(ledgerMetadata)
- .addEventListener(new FutureEventListener<LogSegmentMetadata>() {
+ .whenComplete(new FutureEventListener<LogSegmentMetadata>() {
@Override
public void onFailure(Throwable cause) {
- FutureUtils.setException(promise, cause);
+ FutureUtils.completeExceptionally(promise, cause);
}
@Override
@@ -1242,14 +1220,14 @@ class BKLogWriteHandler extends BKLogHandler {
}
private void deleteLogSegmentMetadata(final LogSegmentMetadata segmentMetadata,
- final Promise<LogSegmentMetadata> promise) {
+ final CompletableFuture<LogSegmentMetadata> promise) {
Transaction<Object> deleteTxn = metadataStore.transaction();
metadataStore.deleteLogSegment(deleteTxn, segmentMetadata, new Transaction.OpListener<Void>() {
@Override
public void onCommit(Void r) {
// purge log segment
removeLogSegmentFromCache(segmentMetadata.getZNodeName());
- promise.setValue(segmentMetadata);
+ promise.complete(segmentMetadata);
}
@Override
@@ -1257,12 +1235,12 @@ class BKLogWriteHandler extends BKLogHandler {
if (t instanceof LogSegmentNotFoundException) {
// purge log segment
removeLogSegmentFromCache(segmentMetadata.getZNodeName());
- promise.setValue(segmentMetadata);
+ promise.complete(segmentMetadata);
return;
} else {
LOG.error("Couldn't purge {} for {}: with error {}",
new Object[]{ segmentMetadata, getFullyQualifiedName(), t });
- promise.setException(t);
+ promise.completeExceptionally(t);
}
}
});
@@ -1270,14 +1248,14 @@ class BKLogWriteHandler extends BKLogHandler {
}
@Override
- public Future<Void> asyncClose() {
+ public CompletableFuture<Void> asyncClose() {
return Utils.closeSequence(scheduler,
lock,
logSegmentAllocator);
}
@Override
- public Future<Void> asyncAbort() {
+ public CompletableFuture<Void> asyncAbort() {
return asyncClose();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
index bf89823..47301b5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogReader.java
@@ -20,17 +20,15 @@ package org.apache.distributedlog;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Ticker;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.EndOfStreamException;
import org.apache.distributedlog.exceptions.IdleReaderException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.versioning.Versioned;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.util.LinkedList;
@@ -48,7 +46,7 @@ class BKSyncLogReader implements LogReader, AsyncNotification {
private final AtomicReference<IOException> readerException =
new AtomicReference<IOException>(null);
private final int maxReadAheadWaitTime;
- private Promise<Void> closeFuture;
+ private CompletableFuture<Void> closeFuture;
private final Optional<Long> startTransactionId;
private boolean positioned = false;
private Entry.Reader currentEntry = null;
@@ -101,13 +99,10 @@ class BKSyncLogReader implements LogReader, AsyncNotification {
bkdlm.alertStatsLogger);
readHandler.registerListener(readAheadReader);
readHandler.asyncStartFetchLogSegments()
- .map(new AbstractFunction1<Versioned<List<LogSegmentMetadata>>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Versioned<List<LogSegmentMetadata>> logSegments) {
- readAheadReader.addStateChangeNotification(BKSyncLogReader.this);
- readAheadReader.start(logSegments.getValue());
- return BoxedUnit.UNIT;
- }
+ .thenApply(logSegments -> {
+ readAheadReader.addStateChangeNotification(BKSyncLogReader.this);
+ readAheadReader.start(logSegments.getValue());
+ return null;
});
}
@@ -234,26 +229,28 @@ class BKSyncLogReader implements LogReader, AsyncNotification {
}
@Override
- public Future<Void> asyncClose() {
- Promise<Void> closePromise;
+ public CompletableFuture<Void> asyncClose() {
+ CompletableFuture<Void> closePromise;
synchronized (this) {
if (null != closeFuture) {
return closeFuture;
}
- closeFuture = closePromise = new Promise<Void>();
+ closeFuture = closePromise = new CompletableFuture<Void>();
}
readHandler.unregisterListener(readAheadReader);
readAheadReader.removeStateChangeNotification(this);
- Utils.closeSequence(bkdlm.getScheduler(), true,
- readAheadReader,
- readHandler
- ).proxyTo(closePromise);
+ FutureUtils.proxyTo(
+ Utils.closeSequence(bkdlm.getScheduler(), true,
+ readAheadReader,
+ readHandler
+ ),
+ closePromise);
return closePromise;
}
@Override
public void close() throws IOException {
- FutureUtils.result(asyncClose());
+ Utils.ioResult(asyncClose());
}
//
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
index 7d33d12..15296b2 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKSyncLogWriter.java
@@ -17,11 +17,11 @@
*/
package org.apache.distributedlog;
-import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.util.FutureUtils;
-
import java.io.IOException;
import java.util.List;
+import org.apache.distributedlog.api.LogWriter;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.util.Utils;
class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
@@ -59,7 +59,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
*/
@Override
public void markEndOfStream() throws IOException {
- FutureUtils.result(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream());
+ Utils.ioResult(getLedgerWriter(DistributedLogConstants.MAX_TXID, true).markEndOfStream());
closeAndComplete();
}
@@ -73,7 +73,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
long highestTransactionId = 0;
BKLogSegmentWriter writer = getCachedLogWriter();
if (null != writer) {
- highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.flush()));
+ highestTransactionId = Math.max(highestTransactionId, Utils.ioResult(writer.flush()));
}
return highestTransactionId;
}
@@ -93,7 +93,7 @@ class BKSyncLogWriter extends BKAbstractLogWriter implements LogWriter {
long highestTransactionId = 0;
BKLogSegmentWriter writer = getCachedLogWriter();
if (null != writer) {
- highestTransactionId = Math.max(highestTransactionId, FutureUtils.result(writer.commit()));
+ highestTransactionId = Math.max(highestTransactionId, Utils.ioResult(writer.commit()));
LOG.debug("FlushAndSync Completed");
} else {
LOG.debug("FlushAndSync Completed - Nothing to Flush");
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
index 6ed662b..3715327 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKTransmitPacket.java
@@ -17,30 +17,28 @@
*/
package org.apache.distributedlog;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
class BKTransmitPacket {
private final EntryBuffer recordSet;
private final long transmitTime;
- private final Promise<Integer> transmitComplete;
+ private final CompletableFuture<Integer> transmitComplete;
BKTransmitPacket(EntryBuffer recordSet) {
this.recordSet = recordSet;
this.transmitTime = System.nanoTime();
- this.transmitComplete = new Promise<Integer>();
+ this.transmitComplete = new CompletableFuture<Integer>();
}
EntryBuffer getRecordSet() {
return recordSet;
}
- Promise<Integer> getTransmitFuture() {
+ CompletableFuture<Integer> getTransmitFuture() {
return transmitComplete;
}
@@ -53,7 +51,7 @@ class BKTransmitPacket {
* transmit result code.
*/
public void notifyTransmitComplete(int transmitResult) {
- transmitComplete.setValue(transmitResult);
+ transmitComplete.complete(transmitResult);
}
/**
@@ -66,7 +64,7 @@ class BKTransmitPacket {
* @see #awaitTransmitComplete(long, TimeUnit)
*/
void addTransmitCompleteListener(FutureEventListener<Integer> transmitCompleteListener) {
- transmitComplete.addEventListener(transmitCompleteListener);
+ transmitComplete.whenComplete(transmitCompleteListener);
}
/**
@@ -79,8 +77,7 @@ class BKTransmitPacket {
*/
int awaitTransmitComplete(long timeout, TimeUnit unit)
throws Exception {
- return Await.result(transmitComplete,
- Duration.fromTimeUnit(timeout, unit));
+ return FutureUtils.result(transmitComplete, timeout, unit);
}
public long getTransmitTime() {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
index a7b17f4..2ea3b5d 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java
@@ -18,17 +18,7 @@
package org.apache.distributedlog;
import com.google.common.base.Optional;
-import org.apache.distributedlog.ZooKeeperClient.Credentials;
-import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
-import org.apache.distributedlog.exceptions.AlreadyClosedException;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.net.NetUtils;
-import org.apache.distributedlog.util.ConfUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Throw;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -41,6 +31,14 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.distributedlog.ZooKeeperClient.Credentials;
+import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
+import org.apache.distributedlog.exceptions.AlreadyClosedException;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.exceptions.ZKException;
+import org.apache.distributedlog.net.NetUtils;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.zookeeper.KeeperException;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.util.HashedWheelTimer;
@@ -198,52 +196,52 @@ public class BookKeeperClient {
}
// Util functions
- public Future<LedgerHandle> createLedger(int ensembleSize,
- int writeQuorumSize,
- int ackQuorumSize) {
+ public CompletableFuture<LedgerHandle> createLedger(int ensembleSize,
+ int writeQuorumSize,
+ int ackQuorumSize) {
BookKeeper bk;
try {
bk = get();
} catch (IOException ioe) {
- return Future.exception(ioe);
+ return FutureUtils.exception(ioe);
}
- final Promise<LedgerHandle> promise = new Promise<LedgerHandle>();
+ final CompletableFuture<LedgerHandle> promise = new CompletableFuture<LedgerHandle>();
bk.asyncCreateLedger(ensembleSize, writeQuorumSize, ackQuorumSize,
BookKeeper.DigestType.CRC32, passwd, new AsyncCallback.CreateCallback() {
@Override
public void createComplete(int rc, LedgerHandle lh, Object ctx) {
if (BKException.Code.OK == rc) {
- promise.updateIfEmpty(new Return<LedgerHandle>(lh));
+ promise.complete(lh);
} else {
- promise.updateIfEmpty(new Throw<LedgerHandle>(BKException.create(rc)));
+ promise.completeExceptionally(BKException.create(rc));
}
}
}, null);
return promise;
}
- public Future<Void> deleteLedger(long lid,
+ public CompletableFuture<Void> deleteLedger(long lid,
final boolean ignoreNonExistentLedger) {
BookKeeper bk;
try {
bk = get();
} catch (IOException ioe) {
- return Future.exception(ioe);
+ return FutureUtils.exception(ioe);
}
- final Promise<Void> promise = new Promise<Void>();
+ final CompletableFuture<Void> promise = new CompletableFuture<Void>();
bk.asyncDeleteLedger(lid, new AsyncCallback.DeleteCallback() {
@Override
public void deleteComplete(int rc, Object ctx) {
if (BKException.Code.OK == rc) {
- promise.updateIfEmpty(new Return<Void>(null));
+ promise.complete(null);
} else if (BKException.Code.NoSuchLedgerExistsException == rc) {
if (ignoreNonExistentLedger) {
- promise.updateIfEmpty(new Return<Void>(null));
+ promise.complete(null);
} else {
- promise.updateIfEmpty(new Throw<Void>(BKException.create(rc)));
+ promise.completeExceptionally(BKException.create(rc));
}
} else {
- promise.updateIfEmpty(new Throw<Void>(BKException.create(rc)));
+ promise.completeExceptionally(BKException.create(rc));
}
}
}, null);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
index 0cb608f..3269f57 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogConfiguration.java
@@ -22,7 +22,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import org.apache.distributedlog.bk.QuorumConfig;
import org.apache.distributedlog.feature.DefaultFeatureProvider;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.net.DNSResolverForRacks;
import org.apache.distributedlog.net.DNSResolverForRows;
import org.apache.bookkeeper.conf.ClientConfiguration;
@@ -1351,7 +1351,6 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
* <p>
* The setting is only applied for v2 implementation.
*
- * @see org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor
* @return number of resource release threads used by distributedlog namespace.
*/
public int getNumResourceReleaseThreads() {
@@ -3048,7 +3047,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
/**
* Whether to enable per stream stat or not.
*
- * @deprecated please use {@link DistributedLogNamespaceBuilder#perLogStatsLogger(StatsLogger)}
+ * @deprecated please use {@link NamespaceBuilder#perLogStatsLogger(StatsLogger)}
* @return flag to enable per stream stat.
*/
public boolean getEnablePerStreamStat() {
@@ -3058,7 +3057,7 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
/**
* Set the flag to enable per stream stat or not.
*
- * @deprecated please use {@link DistributedLogNamespaceBuilder#perLogStatsLogger(StatsLogger)}
+ * @deprecated please use {@link NamespaceBuilder#perLogStatsLogger(StatsLogger)}
* @param enabled
* flag to enable/disable per stream stat.
* @return dl configuration.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
deleted file mode 100644
index 7d33e9c..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/DistributedLogManager.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.callback.LogSegmentListener;
-import org.apache.distributedlog.io.AsyncCloseable;
-import org.apache.distributedlog.namespace.NamespaceDriver;
-import org.apache.distributedlog.subscription.SubscriptionStateStore;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
-import com.twitter.util.Future;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * A DistributedLogManager is responsible for managing a single place of storing
- * edit logs. It may correspond to multiple files, a backup node, etc.
- * Even when the actual underlying storage is rolled, or failed and restored,
- * each conceptual place of storage corresponds to exactly one instance of
- * this class, which is created when the EditLog is first opened.
- */
-public interface DistributedLogManager extends AsyncCloseable, Closeable {
-
- /**
- * Get the name of the stream managed by this log manager
- * @return streamName
- */
- public String getStreamName();
-
- /**
- * Get the namespace driver used by this manager.
- *
- * @return the namespace driver
- */
- public NamespaceDriver getNamespaceDriver();
-
- /**
- * Get log segments.
- *
- * @return log segments
- * @throws IOException
- */
- public List<LogSegmentMetadata> getLogSegments() throws IOException;
-
- /**
- * Register <i>listener</i> on log segment updates of this stream.
- *
- * @param listener
- * listener to receive update log segment list.
- */
- public void registerListener(LogSegmentListener listener) throws IOException ;
-
- /**
- * Unregister <i>listener</i> on log segment updates from this stream.
- *
- * @param listener
- * listener to receive update log segment list.
- */
- public void unregisterListener(LogSegmentListener listener);
-
- /**
- * Open async log writer to write records to the log stream.
- *
- * @return result represents the open result
- */
- public Future<AsyncLogWriter> openAsyncLogWriter();
-
- /**
- * Begin writing to the log stream identified by the name
- *
- * @return the writer interface to generate log records
- */
- public LogWriter startLogSegmentNonPartitioned() throws IOException;
-
- /**
- * Begin writing to the log stream identified by the name
- *
- * @return the writer interface to generate log records
- */
- // @Deprecated
- public AsyncLogWriter startAsyncLogSegmentNonPartitioned() throws IOException;
-
- /**
- * Begin appending to the end of the log stream which is being treated as a sequence of bytes
- *
- * @return the writer interface to generate log records
- */
- public AppendOnlyStreamWriter getAppendOnlyStreamWriter() throws IOException;
-
- /**
- * Get a reader to read a log stream as a sequence of bytes
- *
- * @return the writer interface to generate log records
- */
- public AppendOnlyStreamReader getAppendOnlyStreamReader() throws IOException;
-
- /**
- * Get the input stream starting with fromTxnId for the specified log
- *
- * @param fromTxnId - the first transaction id we want to read
- * @return the stream starting with transaction fromTxnId
- * @throws IOException if a stream cannot be found.
- */
- public LogReader getInputStream(long fromTxnId)
- throws IOException;
-
- public LogReader getInputStream(DLSN fromDLSN) throws IOException;
-
- /**
- * Open an async log reader to read records from a log starting from <code>fromTxnId</code>.
- *
- * @param fromTxnId
- * transaction id to start reading from
- * @return async log reader
- */
- public Future<AsyncLogReader> openAsyncLogReader(long fromTxnId);
-
- /**
- * Open an async log reader to read records from a log starting from <code>fromDLSN</code>
- *
- * @param fromDLSN
- * dlsn to start reading from
- * @return async log reader
- */
- public Future<AsyncLogReader> openAsyncLogReader(DLSN fromDLSN);
-
- // @Deprecated
- public AsyncLogReader getAsyncLogReader(long fromTxnId) throws IOException;
-
- // @Deprecated
- public AsyncLogReader getAsyncLogReader(DLSN fromDLSN) throws IOException;
-
- public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN);
-
- /**
- * Get a log reader with lock starting from <i>fromDLSN</i> and using <i>subscriberId</i>.
- * If two readers tried to open using same subscriberId, one would succeed, while the other
- * will be blocked until it gets the lock.
- *
- * @param fromDLSN
- * start dlsn
- * @param subscriberId
- * subscriber id
- * @return async log reader
- */
- public Future<AsyncLogReader> getAsyncLogReaderWithLock(DLSN fromDLSN, String subscriberId);
-
- /**
- * Get a log reader using <i>subscriberId</i> with lock. The reader will start reading from
- * its last commit position recorded in subscription store. If no last commit position found
- * in subscription store, it would start reading from head of the stream.
- *
- * If the two readers tried to open using same subscriberId, one would succeed, while the other
- * will be blocked until it gets the lock.
- *
- * @param subscriberId
- * subscriber id
- * @return async log reader
- */
- public Future<AsyncLogReader> getAsyncLogReaderWithLock(String subscriberId);
-
- /**
- * Get the {@link DLSN} of first log record whose transaction id is not less than <code>transactionId</code>.
- *
- * @param transactionId
- * transaction id
- * @return dlsn of first log record whose transaction id is not less than transactionId.
- */
- public Future<DLSN> getDLSNNotLessThanTxId(long transactionId);
-
- /**
- * Get the last log record in the stream
- *
- * @return the last log record in the stream
- * @throws IOException if a stream cannot be found.
- */
- public LogRecordWithDLSN getLastLogRecord()
- throws IOException;
-
- /**
- * Get the earliest Transaction Id available in the log
- *
- * @return earliest transaction id
- * @throws IOException
- */
- public long getFirstTxId() throws IOException;
-
- /**
- * Get Latest Transaction Id in the log
- *
- * @return latest transaction id
- * @throws IOException
- */
- public long getLastTxId() throws IOException;
-
- /**
- * Get Latest DLSN in the log
- *
- * @return last dlsn
- * @throws IOException
- */
- public DLSN getLastDLSN() throws IOException;
-
- /**
- * Get Latest log record with DLSN in the log - async
- *
- * @return latest log record with DLSN
- */
- public Future<LogRecordWithDLSN> getLastLogRecordAsync();
-
- /**
- * Get Latest Transaction Id in the log - async
- *
- * @return latest transaction id
- */
- public Future<Long> getLastTxIdAsync();
-
- /**
- * Get first DLSN in the log.
- *
- * @return first dlsn in the stream
- */
- public Future<DLSN> getFirstDLSNAsync();
-
- /**
- * Get Latest DLSN in the log - async
- *
- * @return latest transaction id
- */
- public Future<DLSN> getLastDLSNAsync();
-
- /**
- * Get the number of log records in the active portion of the log
- * Any log segments that have already been truncated will not be included
- *
- * @return number of log records
- * @throws IOException
- */
- public long getLogRecordCount() throws IOException;
-
- /**
- * Get the number of log records in the active portion of the log - async.
- * Any log segments that have already been truncated will not be included
- *
- * @return future number of log records
- * @throws IOException
- */
- public Future<Long> getLogRecordCountAsync(final DLSN beginDLSN);
-
- /**
- * Run recovery on the log.
- *
- * @throws IOException
- */
- public void recover() throws IOException;
-
- /**
- * Check if an end of stream marker was added to the stream
- * A stream with an end of stream marker cannot be appended to
- *
- * @return true if the marker was added to the stream, false otherwise
- * @throws IOException
- */
- public boolean isEndOfStreamMarked() throws IOException;
-
- /**
- * Delete the log.
- *
- * @throws IOException if the deletion fails
- */
- public void delete() throws IOException;
-
- /**
- * The DistributedLogManager may archive/purge any logs for transactionId
- * less than or equal to minImageTxId.
- * This is to be used only when the client explicitly manages deletion. If
- * the cleanup policy is based on sliding time window, then this method need
- * not be called.
- *
- * @param minTxIdToKeep the earliest txid that must be retained
- * @throws IOException if purging fails
- */
- public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
-
- /**
- * Get the subscriptions store provided by the distributedlog manager.
- *
- * @return subscriptions store manages subscriptions for current stream.
- */
- public SubscriptionsStore getSubscriptionsStore();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
index 617282c..30cd499 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/Entry.java
@@ -19,10 +19,10 @@ package org.apache.distributedlog;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -342,7 +342,7 @@ public class Entry {
* @throws LogRecordTooLongException if the record is too long
* @throws WriteException when encountered exception writing the record
*/
- void writeRecord(LogRecord record, Promise<DLSN> transmitPromise)
+ void writeRecord(LogRecord record, CompletableFuture<DLSN> transmitPromise)
throws LogRecordTooLongException, WriteException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
index aed47fc..09301aa 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntry.java
@@ -30,10 +30,10 @@ import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations.Compression;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations.Compression;
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.io.CompressionUtils;
-import org.apache.distributedlog.util.BitMaskUtils;
+import org.apache.distributedlog.common.util.BitMaskUtils;
/**
* An enveloped entry written to BookKeeper.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
index 54858d7..18645d4 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/EnvelopedEntryWriter.java
@@ -17,6 +17,7 @@
*/
package org.apache.distributedlog;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.Entry.Writer;
import org.apache.distributedlog.exceptions.InvalidEnvelopedEntryException;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
@@ -24,7 +25,6 @@ import org.apache.distributedlog.exceptions.WriteCancelledException;
import org.apache.distributedlog.exceptions.WriteException;
import org.apache.distributedlog.io.Buffer;
import org.apache.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,9 +46,9 @@ class EnvelopedEntryWriter implements Writer {
private static class WriteRequest {
private final int numRecords;
- private final Promise<DLSN> promise;
+ private final CompletableFuture<DLSN> promise;
- WriteRequest(int numRecords, Promise<DLSN> promise) {
+ WriteRequest(int numRecords, CompletableFuture<DLSN> promise) {
this.numRecords = numRecords;
this.promise = promise;
}
@@ -89,7 +89,7 @@ class EnvelopedEntryWriter implements Writer {
@Override
public synchronized void writeRecord(LogRecord record,
- Promise<DLSN> transmitPromise)
+ CompletableFuture<DLSN> transmitPromise)
throws LogRecordTooLongException, WriteException {
int logRecordSize = record.getPersistentSize();
if (logRecordSize > MAX_LOGRECORD_SIZE) {
@@ -121,7 +121,7 @@ class EnvelopedEntryWriter implements Writer {
private synchronized void satisfyPromises(long lssn, long entryId) {
long nextSlotId = 0;
for (WriteRequest request : writeRequests) {
- request.promise.setValue(new DLSN(lssn, entryId, nextSlotId));
+ request.promise.complete(new DLSN(lssn, entryId, nextSlotId));
nextSlotId += request.numRecords;
}
writeRequests.clear();
@@ -129,7 +129,7 @@ class EnvelopedEntryWriter implements Writer {
private synchronized void cancelPromises(Throwable reason) {
for (WriteRequest request : writeRequests) {
- request.promise.setException(reason);
+ request.promise.completeExceptionally(reason);
}
writeRequests.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
deleted file mode 100644
index baf3182..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogReader.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import org.apache.distributedlog.io.AsyncCloseable;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.List;
-
-/**
- * <i>LogReader</i> is a `synchronous` reader reading records from a DL log.
- *
- * <h3>Lifecycle of a Reader</h3>
- *
- * A reader is a <i>sequential</i> reader that read records from a DL log starting
- * from a given position. The position could be a <i>DLSN</i> (via {@link DistributedLogManager#getInputStream(DLSN)}
- * or a <i>Transaction ID</i> (via {@link DistributedLogManager#getInputStream(long)}.
- * <p>
- * After the reader is open, it could call {@link #readNext(boolean)} or {@link #readBulk(boolean, int)}
- * to read records out the log from provided position.
- * <p>
- * Closing the reader (via {@link #close()} will release all the resources occupied
- * by this reader instance.
- * <p>
- * Exceptions could be thrown during reading records. Once the exception is thrown,
- * the reader is set to an error state and it isn't usable anymore. It is the application's
- * responsibility to handle the exceptions and re-create readers if necessary.
- * <p>
- * Example:
- * <pre>
- * DistributedLogManager dlm = ...;
- * long nextTxId = ...;
- * LogReader reader = dlm.getInputStream(nextTxId);
- *
- * while (true) { // keep reading & processing records
- * LogRecord record;
- * try {
- * record = reader.readNext(false);
- * nextTxId = record.getTransactionId();
- * // process the record
- * ...
- * } catch (IOException ioe) {
- * // handle the exception
- * ...
- * reader = dlm.getInputStream(nextTxId + 1);
- * }
- * }
- *
- * </pre>
- *
- * <h3>Read Records</h3>
- *
- * Reading records from an <i>endless</i> log in `synchronous` way isn't as
- * trivial as in `asynchronous` way (via {@link AsyncLogReader}. Because it
- * lacks of callback mechanism. LogReader introduces a flag `nonBlocking` on
- * controlling the <i>waiting</i> behavior on `synchronous` reads.
- *
- * <h4>Blocking vs NonBlocking</h4>
- *
- * <i>Blocking</i> (nonBlocking = false) means the reads will wait for records
- * before returning read calls. While <i>NonBlocking</i> (nonBlocking = true)
- * means the reads will only check readahead cache and return whatever records
- * available in the readahead cache.
- * <p>
- * The <i>waiting</i> period varies in <i>blocking</i> mode. If the reader is
- * catching up with writer (there are records in the log), the read call will
- * wait until records are read and returned. If the reader is caught up with
- * writer (there are no more records in the log at read time), the read call
- * will wait for a small period of time (defined in
- * {@link DistributedLogConfiguration#getReadAheadWaitTime()} and return whatever
- * records available in the readahead cache. In other words, if a reader sees
- * no record on blocking reads, it means the reader is `caught-up` with the
- * writer.
- * <p>
- * <i>Blocking</i> and <i>NonBlocking</i> modes are useful for building replicated
- * state machines. Applications could use <i>blocking</i> reads till caught up
- * with latest data. Once they are caught up with latest data, they could start
- * serving their service and turn to <i>non-blocking</i> read mode and tail read
- * data from the logs.
- * <p>
- * See examples below.
- *
- * <h4>Read Single Record</h4>
- *
- * {@link #readNext(boolean)} is reading individual records from a DL log.
- *
- * <pre>
- * LogReader reader = ...
- *
- * // keep reading records in blocking way until no records available in the log
- * LogRecord record = reader.readNext(false);
- * while (null != record) {
- * // process the record
- * ...
- * // read next record
- * records = reader.readNext(false);
- * }
- *
- * ...
- *
- * // reader is caught up with writer, doing non-blocking reads to tail the log
- * while (true) {
- * record = reader.readNext(true)
- * // process the new records
- * ...
- * }
- * </pre>
- *
- * <h4>Read Batch of Records</h4>
- *
- * {@link #readBulk(boolean, int)} is a convenient way to read a batch of records
- * from a DL log.
- *
- * <pre>
- * LogReader reader = ...
- * int N = 10;
- *
- * // keep reading N records in blocking way until no records available in the log
- * List<LogRecord> records = reader.readBulk(false, N);
- * while (!records.isEmpty()) {
- * // process the list of records
- * ...
- * if (records.size() < N) { // no more records available in the log
- * break;
- * }
- * // read next N records
- * records = reader.readBulk(false, N);
- * }
- *
- * ...
- *
- * // reader is caught up with writer, doing non-blocking reads to tail the log
- * while (true) {
- * records = reader.readBulk(true, N)
- * // process the new records
- * ...
- * }
- *
- * </pre>
- *
- * <p>
- * NOTE: Extending {@link AsyncCloseable}: BKSyncLogReader is implemented based on BKAsyncLogReader, exposing
- * the {@link AsyncCloseable} interface so the reader could be closed asynchronously
- *
- * @see AsyncLogReader
- */
-public interface LogReader extends Closeable, AsyncCloseable {
-
- /**
- * Read the next log record from the stream.
- * <p>
- * If <i>nonBlocking</i> is set to true, the call returns immediately by just polling
- * records from read ahead cache. It would return <i>null</i> if there isn't any records
- * available in the read ahead cache.
- * <p>
- * If <i>nonBlocking</i> is set to false, it would does blocking call. The call will
- * block until return a record if there are records in the stream (aka catching up).
- * Otherwise it would wait up to {@link DistributedLogConfiguration#getReadAheadWaitTime()}
- * milliseconds and return null if there isn't any more records in the stream.
- *
- * @param nonBlocking should the read make blocking calls to the backend or rely on the
- * readAhead cache
- * @return an operation from the stream or null if at end of stream
- * @throws IOException if there is an error reading from the stream
- */
- public LogRecordWithDLSN readNext(boolean nonBlocking) throws IOException;
-
- /**
- * Read the next <i>numLogRecords</i> log records from the stream
- *
- * @param nonBlocking should the read make blocking calls to the backend or rely on the
- * readAhead cache
- * @param numLogRecords maximum number of log records returned by this call.
- * @return an operation from the stream or empty list if at end of stream
- * @throws IOException if there is an error reading from the stream
- * @see #readNext(boolean)
- */
- public List<LogRecordWithDLSN> readBulk(boolean nonBlocking, int numLogRecords) throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
index c5050ec..462ddaa 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LogSegmentMetadata.java
@@ -23,14 +23,13 @@ import java.util.Comparator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.LogSegmentNotFoundException;
import org.apache.distributedlog.exceptions.UnsupportedMetadataVersionException;
import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -590,21 +589,21 @@ public class LogSegmentMetadata {
.build();
}
- public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path) {
+ public static CompletableFuture<LogSegmentMetadata> read(ZooKeeperClient zkc, String path) {
return read(zkc, path, false);
}
- public static Future<LogSegmentMetadata> read(ZooKeeperClient zkc, String path, final boolean skipMinVersionCheck) {
- final Promise<LogSegmentMetadata> result = new Promise<LogSegmentMetadata>();
+ public static CompletableFuture<LogSegmentMetadata> read(ZooKeeperClient zkc, String path, final boolean skipMinVersionCheck) {
+ final CompletableFuture<LogSegmentMetadata> result = new CompletableFuture<LogSegmentMetadata>();
try {
zkc.get().getData(path, false, new AsyncCallback.DataCallback() {
@Override
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
if (KeeperException.Code.OK.intValue() != rc) {
if (KeeperException.Code.NONODE.intValue() == rc) {
- FutureUtils.setException(result, new LogSegmentNotFoundException(path));
+ FutureUtils.completeExceptionally(result, new LogSegmentNotFoundException(path));
} else {
- FutureUtils.setException(result,
+ FutureUtils.completeExceptionally(result,
new ZKException("Failed to read log segment metadata from " + path,
KeeperException.Code.get(rc)));
}
@@ -612,17 +611,17 @@ public class LogSegmentMetadata {
}
try {
LogSegmentMetadata metadata = parseData(path, data, skipMinVersionCheck);
- FutureUtils.setValue(result, metadata);
+ FutureUtils.complete(result, metadata);
} catch (IOException ie) {
LOG.error("Error on parsing log segment metadata from {} : ", path, ie);
- result.setException(ie);
+ result.completeExceptionally(ie);
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- result.setException(FutureUtils.zkException(e, path));
+ result.completeExceptionally(Utils.zkException(e, path));
} catch (InterruptedException e) {
- result.setException(FutureUtils.zkException(e, path));
+ result.completeExceptionally(Utils.zkException(e, path));
}
return result;
}