You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:44 UTC
[12/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
index 4dd4c12..14ebf4a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryStore.java
@@ -17,6 +17,7 @@
*/
package org.apache.distributedlog.impl.logsegment;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogSegmentMetadata;
@@ -35,10 +36,8 @@ import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.util.Allocator;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -65,13 +64,13 @@ public class BKLogSegmentEntryStore implements
private final LogSegmentMetadata segment;
private final long startEntryId;
- private final Promise<LogSegmentEntryReader> openPromise;
+ private final CompletableFuture<LogSegmentEntryReader> openPromise;
OpenReaderRequest(LogSegmentMetadata segment,
long startEntryId) {
this.segment = segment;
this.startEntryId = startEntryId;
- this.openPromise = new Promise<LogSegmentEntryReader>();
+ this.openPromise = new CompletableFuture<LogSegmentEntryReader>();
}
}
@@ -79,11 +78,11 @@ public class BKLogSegmentEntryStore implements
private static class DeleteLogSegmentRequest {
private final LogSegmentMetadata segment;
- private final Promise<LogSegmentMetadata> deletePromise;
+ private final CompletableFuture<LogSegmentMetadata> deletePromise;
DeleteLogSegmentRequest(LogSegmentMetadata segment) {
this.segment = segment;
- this.deletePromise = new Promise<LogSegmentMetadata>();
+ this.deletePromise = new CompletableFuture<LogSegmentMetadata>();
}
}
@@ -119,13 +118,13 @@ public class BKLogSegmentEntryStore implements
}
@Override
- public Future<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) {
+ public CompletableFuture<LogSegmentMetadata> deleteLogSegment(LogSegmentMetadata segment) {
DeleteLogSegmentRequest request = new DeleteLogSegmentRequest(segment);
BookKeeper bk;
try {
bk = this.bkc.get();
} catch (IOException e) {
- return Future.exception(e);
+ return FutureUtils.exception(e);
}
bk.asyncDeleteLedger(segment.getLogSegmentId(), this, request);
return request.deletePromise;
@@ -141,11 +140,11 @@ public class BKLogSegmentEntryStore implements
logger.error("Couldn't delete ledger {} from bookkeeper for {} : {}",
new Object[]{ deleteRequest.segment.getLogSegmentId(), deleteRequest.segment,
BKException.getMessage(rc) });
- FutureUtils.setException(deleteRequest.deletePromise,
+ FutureUtils.completeExceptionally(deleteRequest.deletePromise,
new BKTransmitException("Couldn't delete log segment " + deleteRequest.segment, rc));
return;
}
- FutureUtils.setValue(deleteRequest.deletePromise, deleteRequest.segment);
+ FutureUtils.complete(deleteRequest.deletePromise, deleteRequest.segment);
}
//
@@ -186,13 +185,13 @@ public class BKLogSegmentEntryStore implements
//
@Override
- public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
+ public CompletableFuture<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
long startEntryId) {
BookKeeper bk;
try {
bk = this.bkc.get();
} catch (IOException e) {
- return Future.exception(e);
+ return FutureUtils.exception(e);
}
OpenReaderRequest request = new OpenReaderRequest(segment, startEntryId);
if (segment.isInProgress()) {
@@ -217,7 +216,7 @@ public class BKLogSegmentEntryStore implements
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
OpenReaderRequest request = (OpenReaderRequest) ctx;
if (BKException.Code.OK != rc) {
- FutureUtils.setException(
+ FutureUtils.completeExceptionally(
request.openPromise,
new BKTransmitException("Failed to open ledger handle for log segment " + request.segment, rc));
return;
@@ -233,28 +232,28 @@ public class BKLogSegmentEntryStore implements
conf,
statsLogger,
failureInjector);
- FutureUtils.setValue(request.openPromise, reader);
+ FutureUtils.complete(request.openPromise, reader);
} catch (IOException e) {
- FutureUtils.setException(request.openPromise, e);
+ FutureUtils.completeExceptionally(request.openPromise, e);
}
}
@Override
- public Future<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment,
+ public CompletableFuture<LogSegmentRandomAccessEntryReader> openRandomAccessReader(final LogSegmentMetadata segment,
final boolean fence) {
final BookKeeper bk;
try {
bk = this.bkc.get();
} catch (IOException e) {
- return Future.exception(e);
+ return FutureUtils.exception(e);
}
- final Promise<LogSegmentRandomAccessEntryReader> openPromise = new Promise<LogSegmentRandomAccessEntryReader>();
+ final CompletableFuture<LogSegmentRandomAccessEntryReader> openPromise = new CompletableFuture<LogSegmentRandomAccessEntryReader>();
AsyncCallback.OpenCallback openCallback = new AsyncCallback.OpenCallback() {
@Override
public void openComplete(int rc, LedgerHandle lh, Object ctx) {
if (BKException.Code.OK != rc) {
- FutureUtils.setException(
+ FutureUtils.completeExceptionally(
openPromise,
new BKTransmitException("Failed to open ledger handle for log segment " + segment, rc));
return;
@@ -263,7 +262,7 @@ public class BKLogSegmentEntryStore implements
segment,
lh,
conf);
- FutureUtils.setValue(openPromise, reader);
+ FutureUtils.complete(openPromise, reader);
}
};
if (segment.isInProgress() && !fence) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
index d7b331b..254345e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentRandomAccessEntryReader.java
@@ -18,14 +18,13 @@
package org.apache.distributedlog.impl.logsegment;
import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.exceptions.BKTransmitException;
import org.apache.distributedlog.logsegment.LogSegmentRandomAccessEntryReader;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.bookkeeper.client.AsyncCallback.ReadCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -49,7 +48,7 @@ class BKLogSegmentRandomAccessEntryReader implements
// state
private final LogSegmentMetadata metadata;
private final LedgerHandle lh;
- private Promise<Void> closePromise = null;
+ private CompletableFuture<Void> closePromise = null;
BKLogSegmentRandomAccessEntryReader(LogSegmentMetadata metadata,
LedgerHandle lh,
@@ -68,8 +67,8 @@ class BKLogSegmentRandomAccessEntryReader implements
}
@Override
- public Future<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) {
- Promise<List<Entry.Reader>> promise = new Promise<List<Entry.Reader>>();
+ public CompletableFuture<List<Entry.Reader>> readEntries(long startEntryId, long endEntryId) {
+ CompletableFuture<List<Entry.Reader>> promise = new CompletableFuture<List<Entry.Reader>>();
lh.asyncReadEntries(startEntryId, endEntryId, this, promise);
return promise;
}
@@ -86,34 +85,37 @@ class BKLogSegmentRandomAccessEntryReader implements
@Override
public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> entries, Object ctx) {
- Promise<List<Entry.Reader>> promise = (Promise<List<Entry.Reader>>) ctx;
+ CompletableFuture<List<Entry.Reader>> promise = (CompletableFuture<List<Entry.Reader>>) ctx;
if (BKException.Code.OK == rc) {
List<Entry.Reader> entryList = Lists.newArrayList();
while (entries.hasMoreElements()) {
try {
entryList.add(processReadEntry(entries.nextElement()));
} catch (IOException ioe) {
- FutureUtils.setException(promise, ioe);
+ FutureUtils.completeExceptionally(promise, ioe);
return;
}
}
- FutureUtils.setValue(promise, entryList);
+ FutureUtils.complete(promise, entryList);
} else {
- FutureUtils.setException(promise,
+ FutureUtils.completeExceptionally(promise,
new BKTransmitException("Failed to read entries :", rc));
}
}
@Override
- public Future<Void> asyncClose() {
- final Promise<Void> closeFuture;
+ public CompletableFuture<Void> asyncClose() {
+ final CompletableFuture<Void> closeFuture;
synchronized (this) {
if (null != closePromise) {
return closePromise;
}
- closeFuture = closePromise = new Promise<Void>();
+ closeFuture = closePromise = new CompletableFuture<Void>();
}
- BKUtils.closeLedgers(lh).proxyTo(closeFuture);
+ FutureUtils.proxyTo(
+ BKUtils.closeLedgers(lh),
+ closeFuture
+ );
return closeFuture;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
index 3c02740..82ba775 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKUtils.java
@@ -18,11 +18,9 @@
package org.apache.distributedlog.impl.logsegment;
import com.google.common.collect.Lists;
-import org.apache.distributedlog.function.VoidFunctions;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
-import com.twitter.util.Promise;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.common.functions.VoidFunctions;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -40,15 +38,15 @@ public class BKUtils {
* @param lh ledger handle
* @return future represents close result.
*/
- public static Future<Void> closeLedger(LedgerHandle lh) {
- final Promise<Void> closePromise = new Promise<Void>();
+ public static CompletableFuture<Void> closeLedger(LedgerHandle lh) {
+ final CompletableFuture<Void> closePromise = new CompletableFuture<Void>();
lh.asyncClose(new AsyncCallback.CloseCallback() {
@Override
public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
if (BKException.Code.OK != rc) {
- FutureUtils.setException(closePromise, BKException.create(rc));
+ FutureUtils.completeExceptionally(closePromise, BKException.create(rc));
} else {
- FutureUtils.setValue(closePromise, null);
+ FutureUtils.complete(closePromise, null);
}
}
}, null);
@@ -61,12 +59,12 @@ public class BKUtils {
* @param lhs a list of ledgers
* @return future represents close results.
*/
- public static Future<Void> closeLedgers(LedgerHandle ... lhs) {
- List<Future<Void>> closeResults = Lists.newArrayListWithExpectedSize(lhs.length);
+ public static CompletableFuture<Void> closeLedgers(LedgerHandle ... lhs) {
+ List<CompletableFuture<Void>> closeResults = Lists.newArrayListWithExpectedSize(lhs.length);
for (LedgerHandle lh : lhs) {
closeResults.add(closeLedger(lh));
}
- return Futures.collect(closeResults).map(VoidFunctions.LIST_TO_VOID_FUNC);
+ return FutureUtils.collect(closeResults).thenApply(VoidFunctions.LIST_TO_VOID_FUNC);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 30f9dd4..9b02462 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -20,10 +20,12 @@ package org.apache.distributedlog.impl.metadata;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LockCancelledException;
@@ -42,18 +44,14 @@ import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForReader;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.FutureUtils;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.zk.LimitedPermitManager;
import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.PermitManager;
+import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.ZKTransaction;
-import com.twitter.util.ExceptionalFunction;
-import com.twitter.util.ExceptionalFunction0;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.versioning.Versioned;
@@ -69,10 +67,7 @@ import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.List;
@@ -120,14 +115,9 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
private synchronized OrderedScheduler getLockStateExecutor(boolean createIfNull) {
if (createIfNull && null == lockStateExecutor) {
- StatsLogger lockStateStatsLogger = statsLogger.scope("lock_scheduler");
lockStateExecutor = OrderedScheduler.newBuilder()
.name("DLM-LockState")
.corePoolSize(conf.getNumLockStateThreads())
- .statsLogger(lockStateStatsLogger)
- .perExecutorStatsLogger(lockStateStatsLogger)
- .traceTaskExecution(conf.getEnableTaskExecutionStats())
- .traceTaskExecutionWarnTimeUs(conf.getTaskExecutionWarnTimeMicros())
.build();
}
return lockStateExecutor;
@@ -174,21 +164,21 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
}
@Override
- public Future<Void> logExists(URI uri, final String logName) {
+ public CompletableFuture<Void> logExists(URI uri, final String logName) {
final String logSegmentsPath = LogMetadata.getLogSegmentsPath(
uri, logName, conf.getUnpartitionedStreamName());
- final Promise<Void> promise = new Promise<Void>();
+ final CompletableFuture<Void> promise = new CompletableFuture<Void>();
try {
final ZooKeeper zk = zooKeeperClient.get();
zk.sync(logSegmentsPath, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int syncRc, String path, Object syncCtx) {
if (KeeperException.Code.NONODE.intValue() == syncRc) {
- promise.setException(new LogNotFoundException(
+ promise.completeExceptionally(new LogNotFoundException(
String.format("Log %s does not exist or has been deleted", logName)));
return;
} else if (KeeperException.Code.OK.intValue() != syncRc){
- promise.setException(new ZKException("Error on checking log existence for " + logName,
+ promise.completeExceptionally(new ZKException("Error on checking log existence for " + logName,
KeeperException.create(KeeperException.Code.get(syncRc))));
return;
}
@@ -196,12 +186,12 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
- promise.setValue(null);
+ promise.complete(null);
} else if (KeeperException.Code.NONODE.intValue() == rc) {
- promise.setException(new LogNotFoundException(
+ promise.completeExceptionally(new LogNotFoundException(
String.format("Log %s does not exist or has been deleted", logName)));
} else {
- promise.setException(new ZKException("Error on checking log existence for " + logName,
+ promise.completeExceptionally(new ZKException("Error on checking log existence for " + logName,
KeeperException.create(KeeperException.Code.get(rc))));
}
}
@@ -211,10 +201,10 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
} catch (InterruptedException ie) {
LOG.error("Interrupted while reading {}", logSegmentsPath, ie);
- promise.setException(new DLInterruptedException("Interrupted while checking "
+ promise.completeExceptionally(new DLInterruptedException("Interrupted while checking "
+ logSegmentsPath, ie));
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
return promise;
}
@@ -237,15 +227,13 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
// Create Read Lock
//
- private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
+ private CompletableFuture<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
final String readLockPath) {
- final Promise<Void> promise = new Promise<Void>();
- promise.setInterruptHandler(new com.twitter.util.Function<Throwable, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable t) {
- FutureUtils.setException(promise, new LockCancelledException(readLockPath,
- "Could not ensure read lock path", t));
- return null;
+ final CompletableFuture<Void> promise = new CompletableFuture<Void>();
+ promise.whenComplete((value, cause) -> {
+ if (cause instanceof CancellationException) {
+ FutureUtils.completeExceptionally(promise, new LockCancelledException(readLockPath,
+ "Could not ensure read lock path", cause));
}
});
Optional<String> parentPathShouldNotCreate = Optional.of(logMetadata.getLogRootPath());
@@ -255,21 +243,21 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
@Override
public void processResult(final int rc, final String path, Object ctx, String name) {
if (KeeperException.Code.NONODE.intValue() == rc) {
- FutureUtils.setException(promise, new LogNotFoundException(
+ FutureUtils.completeExceptionally(promise, new LogNotFoundException(
String.format("Log %s does not exist or has been deleted",
logMetadata.getFullyQualifiedName())));
} else if (KeeperException.Code.OK.intValue() == rc) {
- FutureUtils.setValue(promise, null);
+ FutureUtils.complete(promise, null);
LOG.trace("Created path {}.", path);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
- FutureUtils.setValue(promise, null);
+ FutureUtils.complete(promise, null);
LOG.trace("Path {} is already existed.", path);
} else if (DistributedLogConstants.ZK_CONNECTION_EXCEPTION_RESULT_CODE == rc) {
- FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
+ FutureUtils.completeExceptionally(promise, new ZooKeeperClient.ZooKeeperConnectionException(path));
} else if (DistributedLogConstants.DL_INTERRUPTED_EXCEPTION_RESULT_CODE == rc) {
- FutureUtils.setException(promise, new DLInterruptedException(path));
+ FutureUtils.completeExceptionally(promise, new DLInterruptedException(path));
} else {
- FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(rc)));
+ FutureUtils.completeExceptionally(promise, KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
@@ -277,28 +265,19 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
}
@Override
- public Future<DistributedLock> createReadLock(final LogMetadataForReader metadata,
+ public CompletableFuture<DistributedLock> createReadLock(final LogMetadataForReader metadata,
Optional<String> readerId) {
final String readLockPath = metadata.getReadLockPath(readerId);
- return ensureReadLockPathExist(metadata, readLockPath).flatMap(
- new ExceptionalFunction<Void, Future<DistributedLock>>() {
- @Override
- public Future<DistributedLock> applyE(Void value) throws Throwable {
- // Unfortunately this has a blocking call which we should not execute on the
- // ZK completion thread
- return scheduler.apply(new ExceptionalFunction0<DistributedLock>() {
- @Override
- public DistributedLock applyE() throws Throwable {
- return new ZKDistributedLock(
- getLockStateExecutor(true),
- getLockFactory(true),
- readLockPath,
- conf.getLockTimeoutMilliSeconds(),
- statsLogger.scope("read_lock"));
- }
- });
- }
- });
+ return ensureReadLockPathExist(metadata, readLockPath)
+ .thenApplyAsync((value) -> {
+ DistributedLock lock = new ZKDistributedLock(
+ getLockStateExecutor(true),
+ getLockFactory(true),
+ readLockPath,
+ conf.getLockTimeoutMilliSeconds(),
+ statsLogger.scope("read_lock"));
+ return lock;
+ }, scheduler.chooseExecutor(readLockPath));
}
//
@@ -329,7 +308,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
(byte) (i)};
}
- static Future<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
+ static CompletableFuture<List<Versioned<byte[]>>> checkLogMetadataPaths(ZooKeeper zk,
String logRootPath,
boolean ownAllocator) {
// Note re. persistent lock state initialization: the read lock persistent state (path) is
@@ -344,7 +323,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
final String allocationPath = logRootPath + ALLOCATION_PATH;
int numPaths = ownAllocator ? MetadataIndex.ALLOCATION + 1 : MetadataIndex.LOGSEGMENTS + 1;
- List<Future<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
+ List<CompletableFuture<Versioned<byte[]>>> checkFutures = Lists.newArrayListWithExpectedSize(numPaths);
checkFutures.add(Utils.zkGetData(zk, logRootParentPath, false));
checkFutures.add(Utils.zkGetData(zk, logRootPath, false));
checkFutures.add(Utils.zkGetData(zk, maxTxIdPath, false));
@@ -356,7 +335,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
checkFutures.add(Utils.zkGetData(zk, allocationPath, false));
}
- return Future.collect(checkFutures);
+ return FutureUtils.collect(checkFutures);
}
static boolean pathExists(Versioned<byte[]> metadata) {
@@ -374,7 +353,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
final List<ACL> acl,
final boolean ownAllocator,
final boolean createIfNotExists,
- final Promise<List<Versioned<byte[]>>> promise) {
+ final CompletableFuture<List<Versioned<byte[]>>> promise) {
final List<byte[]> pathsToCreate = Lists.newArrayListWithExpectedSize(metadatas.size());
final List<Op> zkOps = Lists.newArrayListWithExpectedSize(metadatas.size());
CreateMode createMode = CreateMode.PERSISTENT;
@@ -447,11 +426,11 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
}
if (zkOps.isEmpty()) {
// nothing missed
- promise.setValue(metadatas);
+ promise.complete(metadatas);
return;
}
if (!createIfNotExists) {
- promise.setException(new LogNotFoundException("Log " + logRootPath + " not found"));
+ promise.completeExceptionally(new LogNotFoundException("Log " + logRootPath + " not found"));
return;
}
@@ -469,9 +448,9 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
finalMetadatas.add(new Versioned<byte[]>(dataCreated, new ZkVersion(0)));
}
}
- promise.setValue(finalMetadatas);
+ promise.complete(finalMetadatas);
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
- promise.setException(new LogExistsException("Someone just created log "
+ promise.completeExceptionally(new LogExistsException("Someone just created log "
+ logRootPath));
} else {
if (LOG.isDebugEnabled()) {
@@ -488,7 +467,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
LOG.debug("Failed to create log, full rc list = {}", resultCodeList);
}
- promise.setException(new ZKException("Failed to create log " + logRootPath,
+ promise.completeExceptionally(new ZKException("Failed to create log " + logRootPath,
KeeperException.Code.get(rc)));
}
}
@@ -538,7 +517,7 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
}
}
- static Future<LogMetadataForWriter> getLog(final URI uri,
+ static CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
final String logName,
final String logIdentifier,
final ZooKeeperClient zooKeeperClient,
@@ -549,42 +528,47 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
PathUtils.validatePath(logRootPath);
} catch (IllegalArgumentException e) {
LOG.error("Illegal path value {} for stream {}", new Object[]{logRootPath, logName, e});
- return Future.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
+ return FutureUtils.exception(new InvalidStreamNameException(logName, "Log name is invalid"));
}
try {
final ZooKeeper zk = zooKeeperClient.get();
return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
- .flatMap(new AbstractFunction1<List<Versioned<byte[]>>, Future<List<Versioned<byte[]>>>>() {
+ .thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<List<Versioned<byte[]>>>>() {
@Override
- public Future<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
- Promise<List<Versioned<byte[]>>> promise =
- new Promise<List<Versioned<byte[]>>>();
+ public CompletableFuture<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
+ CompletableFuture<List<Versioned<byte[]>>> promise =
+ new CompletableFuture<List<Versioned<byte[]>>>();
createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
ownAllocator, createIfNotExists, promise);
return promise;
}
- }).map(new ExceptionalFunction<List<Versioned<byte[]>>, LogMetadataForWriter>() {
+ }).thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<LogMetadataForWriter>>() {
@Override
- public LogMetadataForWriter applyE(List<Versioned<byte[]>> metadatas) throws DLException {
- return processLogMetadatas(
- uri,
- logName,
- logIdentifier,
- metadatas,
- ownAllocator);
+ public CompletableFuture<LogMetadataForWriter> apply(List<Versioned<byte[]>> metadatas) {
+ try {
+ return FutureUtils.value(
+ processLogMetadatas(
+ uri,
+ logName,
+ logIdentifier,
+ metadatas,
+ ownAllocator));
+ } catch (UnexpectedException e) {
+ return FutureUtils.exception(e);
+ }
}
});
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- return Future.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName,
+ return FutureUtils.exception(new ZKException("Encountered zookeeper connection issue on creating log " + logName,
KeeperException.Code.CONNECTIONLOSS));
} catch (InterruptedException e) {
- return Future.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
+ return FutureUtils.exception(new DLInterruptedException("Interrupted on creating log " + logName, e));
}
}
@Override
- public Future<LogMetadataForWriter> getLog(final URI uri,
+ public CompletableFuture<LogMetadataForWriter> getLog(final URI uri,
final String logName,
final boolean ownAllocator,
final boolean createIfNotExists) {
@@ -602,30 +586,30 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
//
@Override
- public Future<Void> deleteLog(URI uri, final String logName) {
- final Promise<Void> promise = new Promise<Void>();
+ public CompletableFuture<Void> deleteLog(URI uri, final String logName) {
+ final CompletableFuture<Void> promise = new CompletableFuture<Void>();
try {
String streamPath = LogMetadata.getLogStreamPath(uri, logName);
ZKUtil.deleteRecursive(zooKeeperClient.get(), streamPath, new AsyncCallback.VoidCallback() {
@Override
public void processResult(int rc, String path, Object ctx) {
if (KeeperException.Code.OK.intValue() != rc) {
- FutureUtils.setException(promise,
+ FutureUtils.completeExceptionally(promise,
new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, KeeperException.Code.get(rc)));
return;
}
- FutureUtils.setValue(promise, null);
+ FutureUtils.complete(promise, null);
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+ FutureUtils.completeExceptionally(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, KeeperException.Code.CONNECTIONLOSS));
} catch (InterruptedException e) {
- FutureUtils.setException(promise, new DLInterruptedException("Interrupted while deleting log stream "
+ FutureUtils.completeExceptionally(promise, new DLInterruptedException("Interrupted while deleting log stream "
+ logName));
} catch (KeeperException e) {
- FutureUtils.setException(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+ FutureUtils.completeExceptionally(promise, new ZKException("Encountered zookeeper issue on deleting log stream "
+ logName, e));
}
return promise;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
index 64abb77..302c666 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionStateStore.java
@@ -17,27 +17,22 @@
*/
package org.apache.distributedlog.impl.subscription;
+import com.google.common.base.Charsets;
import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
-
-import org.apache.distributedlog.subscription.SubscriptionStateStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
-
-import com.google.common.base.Charsets;
-
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.api.subscription.SubscriptionStateStore;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
-
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.util.Utils;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class ZKSubscriptionStateStore implements SubscriptionStateStore {
@@ -60,16 +55,16 @@ public class ZKSubscriptionStateStore implements SubscriptionStateStore {
* Get the last committed position stored for this subscription
*/
@Override
- public Future<DLSN> getLastCommitPosition() {
+ public CompletableFuture<DLSN> getLastCommitPosition() {
if (null != lastCommittedPosition.get()) {
- return Future.value(lastCommittedPosition.get());
+ return FutureUtils.value(lastCommittedPosition.get());
} else {
return getLastCommitPositionFromZK();
}
}
- Future<DLSN> getLastCommitPositionFromZK() {
- final Promise<DLSN> result = new Promise<DLSN>();
+ CompletableFuture<DLSN> getLastCommitPositionFromZK() {
+ final CompletableFuture<DLSN> result = new CompletableFuture<DLSN>();
try {
logger.debug("Reading last commit position from path {}", zkPath);
zooKeeperClient.get().getData(zkPath, false, new AsyncCallback.DataCallback() {
@@ -77,25 +72,25 @@ public class ZKSubscriptionStateStore implements SubscriptionStateStore {
public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
logger.debug("Read last commit position from path {}: rc = {}", zkPath, rc);
if (KeeperException.Code.NONODE.intValue() == rc) {
- result.setValue(DLSN.NonInclusiveLowerBound);
+ result.complete(DLSN.NonInclusiveLowerBound);
} else if (KeeperException.Code.OK.intValue() != rc) {
- result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
+ result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
} else {
try {
DLSN dlsn = DLSN.deserialize(new String(data, Charsets.UTF_8));
- result.setValue(dlsn);
+ result.complete(dlsn);
} catch (Exception t) {
logger.warn("Invalid last commit position found from path {}", zkPath, t);
// invalid dlsn recorded in subscription state store
- result.setValue(DLSN.NonInclusiveLowerBound);
+ result.complete(DLSN.NonInclusiveLowerBound);
}
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
- result.setException(zkce);
+ result.completeExceptionally(zkce);
} catch (InterruptedException ie) {
- result.setException(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
+ result.completeExceptionally(new DLInterruptedException("getLastCommitPosition was interrupted", ie));
}
return result;
}
@@ -106,7 +101,7 @@ public class ZKSubscriptionStateStore implements SubscriptionStateStore {
* @param newPosition - new commit position
*/
@Override
- public Future<BoxedUnit> advanceCommitPosition(DLSN newPosition) {
+ public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition) {
if (null == lastCommittedPosition.get() ||
(newPosition.compareTo(lastCommittedPosition.get()) > 0)) {
lastCommittedPosition.set(newPosition);
@@ -115,7 +110,7 @@ public class ZKSubscriptionStateStore implements SubscriptionStateStore {
zooKeeperClient.getDefaultACL(),
CreateMode.PERSISTENT);
} else {
- return Future.Done();
+ return FutureUtils.Void();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
index d75f5fc..0392264 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/subscription/ZKSubscriptionsStore.java
@@ -17,30 +17,26 @@
*/
package org.apache.distributedlog.impl.subscription;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.subscription.SubscriptionStateStore;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
-import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.zookeeper.AsyncCallback;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.data.Stat;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.DLSN;
+import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.api.subscription.SubscriptionStateStore;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Stat;
/**
* ZooKeeper Based Subscriptions Store.
@@ -82,72 +78,62 @@ public class ZKSubscriptionsStore implements SubscriptionsStore {
}
@Override
- public Future<DLSN> getLastCommitPosition(String subscriberId) {
+ public CompletableFuture<DLSN> getLastCommitPosition(String subscriberId) {
return getSubscriber(subscriberId).getLastCommitPosition();
}
@Override
- public Future<Map<String, DLSN>> getLastCommitPositions() {
- final Promise<Map<String, DLSN>> result = new Promise<Map<String, DLSN>>();
+ public CompletableFuture<Map<String, DLSN>> getLastCommitPositions() {
+ final CompletableFuture<Map<String, DLSN>> result = new CompletableFuture<Map<String, DLSN>>();
try {
this.zkc.get().getChildren(this.zkPath, false, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if (KeeperException.Code.NONODE.intValue() == rc) {
- result.setValue(new HashMap<String, DLSN>());
+ result.complete(new HashMap<String, DLSN>());
} else if (KeeperException.Code.OK.intValue() != rc) {
- result.setException(KeeperException.create(KeeperException.Code.get(rc), path));
+ result.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc), path));
} else {
getLastCommitPositions(result, children);
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException zkce) {
- result.setException(zkce);
+ result.completeExceptionally(zkce);
} catch (InterruptedException ie) {
- result.setException(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
+ result.completeExceptionally(new DLInterruptedException("getLastCommitPositions was interrupted", ie));
}
return result;
}
- private void getLastCommitPositions(final Promise<Map<String, DLSN>> result,
+ private void getLastCommitPositions(final CompletableFuture<Map<String, DLSN>> result,
List<String> subscribers) {
- List<Future<Pair<String, DLSN>>> futures =
- new ArrayList<Future<Pair<String, DLSN>>>(subscribers.size());
+ List<CompletableFuture<Pair<String, DLSN>>> futures =
+ new ArrayList<CompletableFuture<Pair<String, DLSN>>>(subscribers.size());
for (String s : subscribers) {
final String subscriber = s;
- Future<Pair<String, DLSN>> future =
+ CompletableFuture<Pair<String, DLSN>> future =
// Get the last commit position from zookeeper
- getSubscriber(subscriber).getLastCommitPositionFromZK().map(
- new AbstractFunction1<DLSN, Pair<String, DLSN>>() {
- @Override
- public Pair<String, DLSN> apply(DLSN dlsn) {
- return Pair.of(subscriber, dlsn);
- }
- });
+ getSubscriber(subscriber).getLastCommitPositionFromZK().thenApply(
+ dlsn -> Pair.of(subscriber, dlsn));
futures.add(future);
}
- Future.collect(futures).foreach(
- new AbstractFunction1<List<Pair<String, DLSN>>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(List<Pair<String, DLSN>> subscriptions) {
- Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
- for (Pair<String, DLSN> pair : subscriptions) {
- subscriptionMap.put(pair.getLeft(), pair.getRight());
- }
- result.setValue(subscriptionMap);
- return BoxedUnit.UNIT;
- }
- });
+ FutureUtils.collect(futures).thenAccept((subscriptions) -> {
+ Map<String, DLSN> subscriptionMap = new HashMap<String, DLSN>();
+ for (Pair<String, DLSN> pair : subscriptions) {
+ subscriptionMap.put(pair.getLeft(), pair.getRight());
+ }
+ result.complete(subscriptionMap);
+ });
}
@Override
- public Future<BoxedUnit> advanceCommitPosition(String subscriberId, DLSN newPosition) {
+ public CompletableFuture<Void> advanceCommitPosition(String subscriberId, DLSN newPosition) {
return getSubscriber(subscriberId).advanceCommitPosition(newPosition);
}
@Override
- public Future<Boolean> deleteSubscriber(String subscriberId) {
+ public CompletableFuture<Boolean> deleteSubscriber(String subscriberId) {
subscribers.remove(subscriberId);
String path = getSubscriberZKPath(subscriberId);
return Utils.zkDeleteIfNotExist(zkc, path, new ZkVersion(-1));
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java
deleted file mode 100644
index b2b430d..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortable.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import java.io.IOException;
-
-/**
- * An {@code Abortable} is a source or destination of data that can be aborted.
- * The abort method is invoked to release resources that the object is holding
- * (such as open files). The abort happens when the object is in an error state,
- * which it couldn't be closed gracefully.
- *
- * @see java.io.Closeable
- * @since 0.3.32
- */
-public interface Abortable {
-
- /**
- * Aborts the object and releases any resources associated with it.
- * If the object is already aborted then invoking this method has no
- * effect.
- *
- * @throws IOException if an I/O error occurs.
- */
- public void abort() throws IOException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java
deleted file mode 100644
index a4838b1..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/Abortables.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.function.VoidFunctions;
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Future;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Utility methods for working with {@link Abortable} objects.
- *
- * @since 0.3.32
- */
-public final class Abortables {
-
- static final Logger logger = LoggerFactory.getLogger(Abortables.class);
-
- private Abortables() {}
-
- public static Future<Void> asyncAbort(@Nullable AsyncAbortable abortable,
- boolean swallowIOException) {
- if (null == abortable) {
- return Future.Void();
- } else if (swallowIOException) {
- return FutureUtils.ignore(abortable.asyncAbort());
- } else {
- return abortable.asyncAbort();
- }
- }
-
- /**
- * Aborts a {@link Abortable}, with control over whether an {@link IOException} may be thrown.
- * This is primarily useful in a finally block, where a thrown exception needs to be logged but
- * not propagated (otherwise the original exception will be lost).
- *
- * <p>If {@code swallowIOException} is true then we never throw {@code IOException} but merely log it.
- *
- * <p>Example: <pre> {@code
- *
- * public void abortStreamNicely() throws IOException {
- * SomeStream stream = new SomeStream("foo");
- * try {
- * // ... code which does something with the stream ...
- * } catch (IOException ioe) {
- * // If an exception occurs, we might abort the stream.
- * Abortables.abort(stream, true);
- * }
- * }}</pre>
- *
- * @param abortable the {@code Abortable} object to be aborted, or null, in which case this method
- * does nothing.
- * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods
- * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException}
- */
- public static void abort(@Nullable Abortable abortable,
- boolean swallowIOException)
- throws IOException {
- if (null == abortable) {
- return;
- }
- try {
- abortable.abort();
- } catch (IOException ioe) {
- if (swallowIOException) {
- logger.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe);
- } else {
- throw ioe;
- }
- }
- }
-
- /**
- * Abort async <i>abortable</i>
- *
- * @param abortable the {@code AsyncAbortable} object to be aborted, or null, in which case this method
- * does nothing.
- * @param swallowIOException if true, don't propagate IO exceptions thrown by the {@code abort} methods
- * @throws IOException if {@code swallowIOException} is false and {@code abort} throws an {@code IOException}
- * @see #abort(Abortable, boolean)
- */
- public static void abort(@Nullable AsyncAbortable abortable,
- boolean swallowIOException)
- throws IOException {
- if (null == abortable) {
- return;
- }
- try {
- FutureUtils.result(abortable.asyncAbort());
- } catch (IOException ioe) {
- if (swallowIOException) {
- logger.warn("IOException thrown while aborting Abortable {} : ", abortable, ioe);
- } else {
- throw ioe;
- }
- }
- }
-
- /**
- * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than
- * propagating it.
- *
- * While it's not safe in the general case to ignore exceptions that are thrown when aborting an
- * I/O resource, it should generally be safe in the case of a resource that's being used only for
- * reading.
- *
- * @param abortable the {@code Abortable} to be closed, or {@code null} in which case this method
- * does nothing.
- */
- public static void abortQuietly(@Nullable Abortable abortable) {
- try {
- abort(abortable, true);
- } catch (IOException e) {
- logger.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e);
- }
- }
-
- /**
- * Aborts the given {@code abortable}, logging any {@code IOException} that's thrown rather than
- * propagating it.
- *
- * While it's not safe in the general case to ignore exceptions that are thrown when aborting an
- * I/O resource, it should generally be safe in the case of a resource that's being used only for
- * reading.
- *
- * @param abortable the {@code AsyncAbortable} to be closed, or {@code null} in which case this method
- * does nothing.
- */
- public static void abortQuietly(@Nullable AsyncAbortable abortable) {
- try {
- abort(abortable, true);
- } catch (IOException e) {
- logger.error("Unexpected IOException thrown while aborting Abortable {} quietly : ", abortable, e);
- }
- }
-
- /**
- * Abort the abortables in sequence.
- *
- * @param executorService
- * executor service to execute
- * @param abortables
- * abortables to abort
- * @return future represents the abort future
- */
- public static Future<Void> abortSequence(ExecutorService executorService,
- AsyncAbortable... abortables) {
- List<AsyncAbortable> abortableList = Lists.newArrayListWithExpectedSize(abortables.length);
- for (AsyncAbortable abortable : abortables) {
- if (null == abortable) {
- abortableList.add(AsyncAbortable.NULL);
- } else {
- abortableList.add(abortable);
- }
- }
- return FutureUtils.processList(
- abortableList,
- AsyncAbortable.ABORT_FUNC,
- executorService).map(VoidFunctions.LIST_TO_VOID_FUNC);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
deleted file mode 100644
index 7ec26a1..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncAbortable.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-
-/**
- * An {@code Abortable} is a source or destination of data that can be aborted.
- * The abort method is invoked to release resources that the object is holding
- * (such as open files). The abort happens when the object is in an error state,
- * which it couldn't be closed gracefully.
- *
- * @see AsyncCloseable
- * @see Abortable
- * @since 0.3.43
- */
-public interface AsyncAbortable {
-
- Function<AsyncAbortable, Future<Void>> ABORT_FUNC = new Function<AsyncAbortable, Future<Void>>() {
- @Override
- public Future<Void> apply(AsyncAbortable abortable) {
- return abortable.asyncAbort();
- }
- };
-
- AsyncAbortable NULL = new AsyncAbortable() {
- @Override
- public Future<Void> asyncAbort() {
- return Future.Void();
- }
- };
-
- /**
- * Aborts the object and releases any resources associated with it.
- * If the object is already aborted then invoking this method has no
- * effect.
- *
- * @return future represents the abort result
- */
- Future<Void> asyncAbort();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
deleted file mode 100644
index 2bf0119..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncCloseable.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import org.apache.distributedlog.util.FutureUtils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-
-/**
- * A {@code AsyncCloseable} is a source or destination of data that can be closed asynchronously.
- * The close method is invoked to release resources that the object is
- * holding (such as open files).
- */
-public interface AsyncCloseable {
-
- Function<AsyncCloseable, Future<Void>> CLOSE_FUNC = new Function<AsyncCloseable, Future<Void>>() {
- @Override
- public Future<Void> apply(AsyncCloseable closeable) {
- return closeable.asyncClose();
- }
- };
-
- Function<AsyncCloseable, Future<Void>> CLOSE_FUNC_IGNORE_ERRORS = new Function<AsyncCloseable, Future<Void>>() {
- @Override
- public Future<Void> apply(AsyncCloseable closeable) {
- return FutureUtils.ignore(closeable.asyncClose());
- }
- };
-
- AsyncCloseable NULL = new AsyncCloseable() {
- @Override
- public Future<Void> asyncClose() {
- return Future.Void();
- }
- };
-
- /**
- * Closes this source and releases any system resources associated
- * with it. If the source is already closed then invoking this
- * method has no effect.
- *
- * @return future representing the close result.
- */
- Future<Void> asyncClose();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
deleted file mode 100644
index 046c731..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/AsyncDeleteable.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.io;
-
-import com.twitter.util.Future;
-
-/**
- * A {@code AsyncDeleteable} is a source or destination of data that can be deleted asynchronously.
- * This delete method is invoked to delete the source.
- */
-public interface AsyncDeleteable {
- /**
- * Releases any system resources associated with this and delete the source. If the source is
- * already deleted then invoking this method has no effect.
- *
- * @return future representing the deletion result.
- */
- Future<Void> delete();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java
deleted file mode 100644
index eb81cfe..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/io/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * IO Utils for distributedlog
- */
-package org.apache.distributedlog.io;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
index 95165ef..ae01bf7 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ComposableRequestLimiter.java
@@ -17,16 +17,10 @@
*/
package org.apache.distributedlog.limiter;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-
-import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.limiter.GuavaRateLimiter;
-import org.apache.distributedlog.limiter.RateLimiter;
-
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
-
+import org.apache.distributedlog.exceptions.OverCapacityException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
index 986678c..156d6dd 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/DistributedLock.java
@@ -17,9 +17,9 @@
*/
package org.apache.distributedlog.lock;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.io.AsyncCloseable;
-import com.twitter.util.Future;
/**
* Interface for distributed locking
@@ -31,7 +31,7 @@ public interface DistributedLock extends AsyncCloseable {
*
* @return future represents the acquire result.
*/
- Future<? extends DistributedLock> asyncAcquire();
+ CompletableFuture<? extends DistributedLock> asyncAcquire();
/**
* Check if hold lock. If it doesn't, then re-acquire the lock.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
index b70098e..1cb3364 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/LockWaiter.java
@@ -17,10 +17,13 @@
*/
package org.apache.distributedlog.lock;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import com.twitter.util.Timer;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.distributedlog.exceptions.DLInterruptedException;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -33,11 +36,11 @@ public class LockWaiter {
private final String lockId;
private final String currentOwner;
- private final Future<Boolean> acquireFuture;
+ private final CompletableFuture<Boolean> acquireFuture;
public LockWaiter(String lockId,
String currentOwner,
- Future<Boolean> acquireFuture) {
+ CompletableFuture<Boolean> acquireFuture) {
this.lockId = lockId;
this.currentOwner = currentOwner;
this.acquireFuture = acquireFuture;
@@ -64,12 +67,13 @@ public class LockWaiter {
/**
* Return the future representing the waiting result.
*
- * <p>If the future is interrupted (e.g. {@link Future#within(Duration, Timer)}),
+ * <p>If the future is interrupted
+ * (e.g. {@link FutureUtils#within(CompletableFuture, long, TimeUnit, Throwable, OrderedScheduler, Object)}),
* the waiter will automatically clean up its waiting state.
*
* @return the future representing the acquire result.
*/
- public Future<Boolean> getAcquireFuture() {
+ public CompletableFuture<Boolean> getAcquireFuture() {
return acquireFuture;
}
@@ -81,12 +85,12 @@ public class LockWaiter {
public boolean waitForAcquireQuietly() {
boolean success = false;
try {
- success = Await.result(acquireFuture);
- } catch (InterruptedException ie) {
+ success = Utils.ioResult(acquireFuture);
+ } catch (DLInterruptedException ie) {
Thread.currentThread().interrupt();
} catch (LockTimeoutException lte) {
logger.debug("Timeout on lock acquiring", lte);
- } catch (Exception e) {
+ } catch (IOException e) {
logger.error("Caught exception waiting for lock acquired", e);
}
return success;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
index 88abffa..7f770ad 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/NopDistributedLock.java
@@ -17,8 +17,9 @@
*/
package org.apache.distributedlog.lock;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.exceptions.LockingException;
-import com.twitter.util.Future;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
/**
* An implementation of {@link DistributedLock} which does nothing.
@@ -30,8 +31,8 @@ public class NopDistributedLock implements DistributedLock {
private NopDistributedLock() {}
@Override
- public Future<? extends DistributedLock> asyncAcquire() {
- return Future.value(this);
+ public CompletableFuture<? extends DistributedLock> asyncAcquire() {
+ return FutureUtils.value(this);
}
@Override
@@ -45,7 +46,7 @@ public class NopDistributedLock implements DistributedLock {
}
@Override
- public Future<Void> asyncClose() {
- return Future.Void();
+ public CompletableFuture<Void> asyncClose() {
+ return FutureUtils.Void();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
index 8aec2c0..3a46a13 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLock.java
@@ -17,12 +17,10 @@
*/
package org.apache.distributedlog.lock;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
-import com.twitter.util.Future;
-import scala.runtime.BoxedUnit;
-
-import java.util.concurrent.TimeUnit;
/**
* One time lock.
@@ -71,7 +69,7 @@ public interface SessionLock {
* <i>tryLock</i> here is effectively the combination of following asynchronous calls.
* <pre>
* ZKDistributedLock lock = ...;
- * Future<LockWaiter> attemptFuture = lock.asyncTryLock(...);
+ * CompletableFuture<LockWaiter> attemptFuture = lock.asyncTryLock(...);
*
* boolean acquired = waiter.waitForAcquireQuietly();
* if (acquired) {
@@ -106,7 +104,7 @@ public interface SessionLock {
* @return lock waiter representing this attempt of acquiring lock.
* @see #tryLock(long, TimeUnit)
*/
- Future<LockWaiter> asyncTryLock(long timeout, TimeUnit unit);
+ CompletableFuture<LockWaiter> asyncTryLock(long timeout, TimeUnit unit);
/**
* Release a claimed lock.
@@ -121,6 +119,6 @@ public interface SessionLock {
* @return future representing the result of unlock operation.
* @see #unlock()
*/
- Future<BoxedUnit> asyncUnlock();
+ CompletableFuture<Void> asyncUnlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
index a68f2d8..9d3159e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/SessionLockFactory.java
@@ -17,7 +17,7 @@
*/
package org.apache.distributedlog.lock;
-import com.twitter.util.Future;
+import java.util.concurrent.CompletableFuture;
/**
* Factory to create {@link SessionLock}
@@ -33,6 +33,6 @@ public interface SessionLockFactory {
* lock context
* @return future represents the creation result.
*/
- Future<SessionLock> createLock(String lockPath, DistributedLockContext context);
+ CompletableFuture<SessionLock> createLock(String lockPath, DistributedLockContext context);
}