You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:45 UTC
[13/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java
deleted file mode 100644
index 79f9c32..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/VoidFunctions.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.function;
-
-import scala.runtime.AbstractFunction1;
-
-import java.util.List;
-
-public class VoidFunctions {
-
- public static final AbstractFunction1<List<Void>, Void> LIST_TO_VOID_FUNC =
- new AbstractFunction1<List<Void>, Void>() {
- @Override
- public Void apply(List<Void> list) {
- return null;
- }
- };
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
index dbe5400..21fe227 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java
@@ -24,7 +24,7 @@ import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.MetadataAccessor;
+import org.apache.distributedlog.api.MetadataAccessor;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.acl.AccessControlManager;
@@ -47,7 +47,7 @@ import org.apache.distributedlog.metadata.LogMetadataStore;
import org.apache.distributedlog.metadata.LogStreamMetadataStore;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.namespace.NamespaceDriverManager;
-import org.apache.distributedlog.subscription.SubscriptionsStore;
+import org.apache.distributedlog.api.subscription.SubscriptionsStore;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.apache.bookkeeper.feature.FeatureProvider;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
index 0761cfc..7069cbb 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogMetadataStore.java
@@ -19,23 +19,22 @@ package org.apache.distributedlog.impl;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
+import java.net.URI;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.metadata.LogMetadataStore;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
-import java.net.URI;
-import java.util.Iterator;
-import java.util.List;
-
import static org.apache.distributedlog.util.DLUtils.*;
/**
@@ -60,18 +59,18 @@ public class ZKLogMetadataStore implements LogMetadataStore {
}
@Override
- public Future<URI> createLog(String logName) {
- return Future.value(namespace);
+ public CompletableFuture<URI> createLog(String logName) {
+ return FutureUtils.value(namespace);
}
@Override
- public Future<Optional<URI>> getLogLocation(String logName) {
- return Future.value(nsOptional);
+ public CompletableFuture<Optional<URI>> getLogLocation(String logName) {
+ return FutureUtils.value(nsOptional);
}
@Override
- public Future<Iterator<String>> getLogs() {
- final Promise<Iterator<String>> promise = new Promise<Iterator<String>>();
+ public CompletableFuture<Iterator<String>> getLogs() {
+ final CompletableFuture<Iterator<String>> promise = new CompletableFuture<Iterator<String>>();
final String nsRootPath = namespace.getPath();
try {
final ZooKeeper zk = zkc.get();
@@ -89,30 +88,30 @@ public class ZKLogMetadataStore implements LogMetadataStore {
results.add(child);
}
}
- promise.setValue(results.iterator());
+ promise.complete(results.iterator());
} else if (KeeperException.Code.NONODE.intValue() == rc) {
List<String> streams = Lists.newLinkedList();
- promise.setValue(streams.iterator());
+ promise.complete(streams.iterator());
} else {
- promise.setException(new ZKException("Error reading namespace " + nsRootPath,
+ promise.completeExceptionally(new ZKException("Error reading namespace " + nsRootPath,
KeeperException.Code.get(rc)));
}
}
}, null);
} else if (KeeperException.Code.NONODE.intValue() == syncRc) {
List<String> streams = Lists.newLinkedList();
- promise.setValue(streams.iterator());
+ promise.complete(streams.iterator());
} else {
- promise.setException(new ZKException("Error reading namespace " + nsRootPath,
+ promise.completeExceptionally(new ZKException("Error reading namespace " + nsRootPath,
KeeperException.Code.get(syncRc)));
}
}
}, null);
zkc.get();
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
return promise;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
index b9cb374..f747045 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -18,6 +18,7 @@
package org.apache.distributedlog.impl;
import com.google.common.collect.ImmutableList;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ZooKeeperClient;
@@ -29,17 +30,15 @@ import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
import org.apache.distributedlog.util.DLUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Transaction.OpListener;
+import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.DefaultZKOp;
import org.apache.distributedlog.zk.ZKOp;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.distributedlog.zk.ZKVersionedSetOp;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
@@ -116,7 +115,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
@Override
public void run() {
if (null != store.listeners.get(logSegmentsPath)) {
- store.zkGetLogSegmentNames(logSegmentsPath, store).addEventListener(this);
+ store.zkGetLogSegmentNames(logSegmentsPath, store).whenComplete(this);
} else {
logger.debug("Log segments listener for {} has been removed.", logSegmentsPath);
}
@@ -350,18 +349,18 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
}
@Override
- public Future<LogSegmentMetadata> getLogSegment(String logSegmentPath) {
+ public CompletableFuture<LogSegmentMetadata> getLogSegment(String logSegmentPath) {
return LogSegmentMetadata.read(zkc, logSegmentPath, skipMinVersionCheck);
}
- Future<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) {
- Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
+ CompletableFuture<Versioned<List<String>>> zkGetLogSegmentNames(String logSegmentsPath, Watcher watcher) {
+ CompletableFuture<Versioned<List<String>>> result = new CompletableFuture<Versioned<List<String>>>();
try {
zkc.get().getChildren(logSegmentsPath, watcher, this, result);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- result.setException(FutureUtils.zkException(e, logSegmentsPath));
+ result.completeExceptionally(Utils.zkException(e, logSegmentsPath));
} catch (InterruptedException e) {
- result.setException(FutureUtils.zkException(e, logSegmentsPath));
+ result.completeExceptionally(Utils.zkException(e, logSegmentsPath));
}
return result;
}
@@ -369,21 +368,21 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
@Override
@SuppressWarnings("unchecked")
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
- Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx);
+ CompletableFuture<Versioned<List<String>>> result = ((CompletableFuture<Versioned<List<String>>>) ctx);
if (KeeperException.Code.OK.intValue() == rc) {
/** cversion: the number of changes to the children of this znode **/
ZkVersion zkVersion = new ZkVersion(stat.getCversion());
- result.setValue(new Versioned(children, zkVersion));
+ result.complete(new Versioned(children, zkVersion));
} else if (KeeperException.Code.NONODE.intValue() == rc) {
- result.setException(new LogNotFoundException("Log " + path + " not found"));
+ result.completeExceptionally(new LogNotFoundException("Log " + path + " not found"));
} else {
- result.setException(new ZKException("Failed to get log segments from " + path,
+ result.completeExceptionally(new ZKException("Failed to get log segments from " + path,
KeeperException.Code.get(rc)));
}
}
@Override
- public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
+ public CompletableFuture<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
LogSegmentNamesListener listener) {
Watcher zkWatcher;
if (null == listener) {
@@ -422,9 +421,9 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
closeLock.readLock().unlock();
}
}
- Future<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
+ CompletableFuture<Versioned<List<String>>> getLogSegmentNamesResult = zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
if (null != listener) {
- getLogSegmentNamesResult.addEventListener(new ReadLogSegmentsTask(logSegmentsPath, this));
+ getLogSegmentNamesResult.whenComplete(new ReadLogSegmentsTask(logSegmentsPath, this));
}
return zkGetLogSegmentNames(logSegmentsPath, zkWatcher);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
index 551cc44..b3fe456 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/ZKMetadataAccessor.java
@@ -21,17 +21,16 @@ import java.io.IOException;
import java.net.URI;
import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.MetadataAccessor;
+import org.apache.distributedlog.api.MetadataAccessor;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
@@ -45,7 +44,7 @@ import static org.apache.distributedlog.impl.BKNamespaceDriver.getZKServersFromD
public class ZKMetadataAccessor implements MetadataAccessor {
static final Logger LOG = LoggerFactory.getLogger(ZKMetadataAccessor.class);
protected final String name;
- protected Promise<Void> closePromise;
+ protected CompletableFuture<Void> closePromise;
protected final URI uri;
// zookeeper clients
// NOTE: The actual zookeeper client is initialized lazily when it is referenced by
@@ -213,13 +212,13 @@ public class ZKMetadataAccessor implements MetadataAccessor {
* @return future represents the close result.
*/
@Override
- public Future<Void> asyncClose() {
- Promise<Void> closeFuture;
+ public CompletableFuture<Void> asyncClose() {
+ CompletableFuture<Void> closeFuture;
synchronized (this) {
if (null != closePromise) {
return closePromise;
}
- closeFuture = closePromise = new Promise<Void>();
+ closeFuture = closePromise = new CompletableFuture<Void>();
}
// NOTE: ownWriterZKC and ownReaderZKC are mostly used by tests
// the managers created by the namespace - whose zkc will be closed by namespace
@@ -233,13 +232,13 @@ public class ZKMetadataAccessor implements MetadataAccessor {
} catch (Exception e) {
LOG.warn("Exception while closing distributed log manager", e);
}
- FutureUtils.setValue(closeFuture, null);
+ FutureUtils.complete(closeFuture, null);
return closeFuture;
}
@Override
public void close() throws IOException {
- FutureUtils.result(asyncClose());
+ Utils.ioResult(asyncClose());
}
public synchronized void checkClosedOrInError(String operation) throws AlreadyClosedException {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java
index 63a81bd..e4a175c 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControl.java
@@ -19,10 +19,9 @@ package org.apache.distributedlog.impl.acl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
@@ -100,8 +99,8 @@ public class ZKAccessControl {
return accessControlEntry;
}
- public Future<ZKAccessControl> create(ZooKeeperClient zkc) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
+ public CompletableFuture<ZKAccessControl> create(ZooKeeperClient zkc) {
+ final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
try {
zkc.get().create(zkPath, serialize(accessControlEntry), zkc.getDefaultACL(), CreateMode.PERSISTENT,
new AsyncCallback.StringCallback() {
@@ -109,48 +108,48 @@ public class ZKAccessControl {
public void processResult(int rc, String path, Object ctx, String name) {
if (KeeperException.Code.OK.intValue() == rc) {
ZKAccessControl.this.zkVersion = 0;
- promise.setValue(ZKAccessControl.this);
+ promise.complete(ZKAccessControl.this);
} else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+ promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (IOException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
return promise;
}
- public Future<ZKAccessControl> update(ZooKeeperClient zkc) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
+ public CompletableFuture<ZKAccessControl> update(ZooKeeperClient zkc) {
+ final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
try {
zkc.get().setData(zkPath, serialize(accessControlEntry), zkVersion, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
ZKAccessControl.this.zkVersion = stat.getVersion();
- promise.setValue(ZKAccessControl.this);
+ promise.complete(ZKAccessControl.this);
} else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+ promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (IOException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
return promise;
}
- public static Future<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
+ public static CompletableFuture<ZKAccessControl> read(final ZooKeeperClient zkc, final String zkPath, Watcher watcher) {
+ final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
try {
zkc.get().getData(zkPath, watcher, new AsyncCallback.DataCallback() {
@@ -159,25 +158,25 @@ public class ZKAccessControl {
if (KeeperException.Code.OK.intValue() == rc) {
try {
AccessControlEntry ace = deserialize(zkPath, data);
- promise.setValue(new ZKAccessControl(ace, zkPath, stat.getVersion()));
+ promise.complete(new ZKAccessControl(ace, zkPath, stat.getVersion()));
} catch (IOException ioe) {
- promise.setException(ioe);
+ promise.completeExceptionally(ioe);
}
} else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+ promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
return promise;
}
- public static Future<Void> delete(final ZooKeeperClient zkc, final String zkPath) {
- final Promise<Void> promise = new Promise<Void>();
+ public static CompletableFuture<Void> delete(final ZooKeeperClient zkc, final String zkPath) {
+ final CompletableFuture<Void> promise = new CompletableFuture<Void>();
try {
zkc.get().delete(zkPath, -1, new AsyncCallback.VoidCallback() {
@@ -185,16 +184,16 @@ public class ZKAccessControl {
public void processResult(int rc, String path, Object ctx) {
if (KeeperException.Code.OK.intValue() == rc ||
KeeperException.Code.NONODE.intValue() == rc) {
- promise.setValue(null);
+ promise.complete(null);
} else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+ promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
return promise;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java
index be8db64..3dbde6a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/acl/ZKAccessControlManager.java
@@ -18,16 +18,15 @@
package org.apache.distributedlog.impl.acl;
import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -76,7 +75,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
this.scheduledExecutorService = scheduledExecutorService;
this.streamEntries = new ConcurrentHashMap<String, ZKAccessControl>();
try {
- Await.result(fetchDefaultAccessControlEntry());
+ FutureUtils.result(fetchDefaultAccessControlEntry());
} catch (Throwable t) {
if (t instanceof InterruptedException) {
throw new DLInterruptedException("Interrupted on getting default access control entry for " + zkRootPath, t);
@@ -90,7 +89,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
}
try {
- Await.result(fetchAccessControlEntries());
+ FutureUtils.result(fetchAccessControlEntries());
} catch (Throwable t) {
if (t instanceof InterruptedException) {
throw new DLInterruptedException("Interrupted on getting access control entries for " + zkRootPath, t);
@@ -140,19 +139,19 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
closed = true;
}
- private Future<Void> fetchAccessControlEntries() {
- final Promise<Void> promise = new Promise<Void>();
+ private CompletableFuture<Void> fetchAccessControlEntries() {
+ final CompletableFuture<Void> promise = new CompletableFuture<Void>();
fetchAccessControlEntries(promise);
return promise;
}
- private void fetchAccessControlEntries(final Promise<Void> promise) {
+ private void fetchAccessControlEntries(final CompletableFuture<Void> promise) {
try {
zkc.get().getChildren(zkRootPath, this, new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if (KeeperException.Code.OK.intValue() != rc) {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+ promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
return;
}
Set<String> streamsReceived = new HashSet<String>();
@@ -166,7 +165,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
}
}
if (streamsReceived.isEmpty()) {
- promise.setValue(null);
+ promise.complete(null);
return;
}
final AtomicInteger numPendings = new AtomicInteger(streamsReceived.size());
@@ -174,7 +173,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
for (String s : streamsReceived) {
final String streamName = s;
ZKAccessControl.read(zkc, zkRootPath + "/" + streamName, null)
- .addEventListener(new FutureEventListener<ZKAccessControl>() {
+ .whenComplete(new FutureEventListener<ZKAccessControl>() {
@Override
public void onSuccess(ZKAccessControl accessControl) {
@@ -193,7 +192,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
streamEntries.remove(streamName);
} else {
if (1 == numFailures.incrementAndGet()) {
- promise.setException(cause);
+ promise.completeExceptionally(cause);
}
}
complete();
@@ -201,7 +200,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
private void complete() {
if (0 == numPendings.decrementAndGet() && numFailures.get() == 0) {
- promise.setValue(null);
+ promise.complete(null);
}
}
});
@@ -209,28 +208,28 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
}
- private Future<ZKAccessControl> fetchDefaultAccessControlEntry() {
- final Promise<ZKAccessControl> promise = new Promise<ZKAccessControl>();
+ private CompletableFuture<ZKAccessControl> fetchDefaultAccessControlEntry() {
+ final CompletableFuture<ZKAccessControl> promise = new CompletableFuture<ZKAccessControl>();
fetchDefaultAccessControlEntry(promise);
return promise;
}
- private void fetchDefaultAccessControlEntry(final Promise<ZKAccessControl> promise) {
+ private void fetchDefaultAccessControlEntry(final CompletableFuture<ZKAccessControl> promise) {
ZKAccessControl.read(zkc, zkRootPath, this)
- .addEventListener(new FutureEventListener<ZKAccessControl>() {
+ .whenComplete(new FutureEventListener<ZKAccessControl>() {
@Override
public void onSuccess(ZKAccessControl accessControl) {
logger.info("Default Access Control will be changed from {} to {}",
ZKAccessControlManager.this.defaultAccessControl,
accessControl);
ZKAccessControlManager.this.defaultAccessControl = accessControl;
- promise.setValue(accessControl);
+ promise.complete(accessControl);
}
@Override
@@ -239,21 +238,21 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
logger.info("Default Access Control is missing, creating one for {} ...", zkRootPath);
createDefaultAccessControlEntryIfNeeded(promise);
} else {
- promise.setException(cause);
+ promise.completeExceptionally(cause);
}
}
});
}
- private void createDefaultAccessControlEntryIfNeeded(final Promise<ZKAccessControl> promise) {
+ private void createDefaultAccessControlEntryIfNeeded(final CompletableFuture<ZKAccessControl> promise) {
ZooKeeper zk;
try {
zk = zkc.get();
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
return;
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
return;
}
ZkUtils.asyncCreateFullPathOptimistic(zk, zkRootPath, new byte[0], zkc.getDefaultACL(),
@@ -264,7 +263,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
logger.info("Created zk path {} for default ACL.", zkRootPath);
fetchDefaultAccessControlEntry(promise);
} else {
- promise.setException(KeeperException.create(KeeperException.Code.get(rc)));
+ promise.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
}
}
}, null);
@@ -277,7 +276,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
- fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
+ fetchDefaultAccessControlEntry().whenComplete(new FutureEventListener<ZKAccessControl>() {
@Override
public void onSuccess(ZKAccessControl value) {
// no-op
@@ -305,7 +304,7 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
- fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
+ fetchAccessControlEntries().whenComplete(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
// no-op
@@ -328,10 +327,10 @@ public class ZKAccessControlManager implements AccessControlManager, Watcher {
scheduledExecutorService.schedule(new Runnable() {
@Override
public void run() {
- fetchDefaultAccessControlEntry().addEventListener(new FutureEventListener<ZKAccessControl>() {
+ fetchDefaultAccessControlEntry().whenComplete(new FutureEventListener<ZKAccessControl>() {
@Override
public void onSuccess(ZKAccessControl value) {
- fetchAccessControlEntries().addEventListener(new FutureEventListener<Void>() {
+ fetchAccessControlEntries().whenComplete(new FutureEventListener<Void>() {
@Override
public void onSuccess(Void value) {
// no-op
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
index 5d7af9d..17515c3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/federated/FederatedZKLogMetadataStore.java
@@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.callback.NamespaceListener;
@@ -32,12 +33,10 @@ import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.ZKNamespaceWatcher;
import org.apache.distributedlog.metadata.LogMetadataStore;
import org.apache.distributedlog.namespace.NamespaceWatcher;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -49,8 +48,6 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.net.URI;
@@ -80,8 +77,9 @@ import static com.google.common.base.Charsets.UTF_8;
* NOTE: current federated namespace isn't optimized for deletion/creation. so don't use it in the workloads
* that have lots of creations or deletions.
*/
-public class FederatedZKLogMetadataStore extends NamespaceWatcher implements LogMetadataStore, Watcher, Runnable,
- FutureEventListener<Set<URI>> {
+public class FederatedZKLogMetadataStore
+ extends NamespaceWatcher
+ implements LogMetadataStore, Watcher, Runnable, FutureEventListener<Set<URI>> {
static final Logger logger = LoggerFactory.getLogger(FederatedZKLogMetadataStore.class);
@@ -100,7 +98,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
* @throws KeeperException
*/
public static void createFederatedNamespace(URI namespace, ZooKeeperClient zkc)
- throws InterruptedException, ZooKeeperClient.ZooKeeperConnectionException, KeeperException {
+ throws IOException, KeeperException {
String zkSubNamespacesPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES;
Utils.zkCreateFullPathOptimistic(zkc, zkSubNamespacesPath, new byte[0],
zkc.getDefaultACL(), CreateMode.PERSISTENT);
@@ -112,7 +110,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
class SubNamespace implements NamespaceListener {
final URI uri;
final ZKNamespaceWatcher watcher;
- Promise<Set<String>> logsFuture = new Promise<Set<String>>();
+ CompletableFuture<Set<String>> logsFuture = new CompletableFuture<Set<String>>();
SubNamespace(URI uri) {
this.uri = uri;
@@ -124,7 +122,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
this.watcher.watchNamespaceChanges();
}
- synchronized Future<Set<String>> getLogs() {
+ synchronized CompletableFuture<Set<String>> getLogs() {
return logsFuture;
}
@@ -134,16 +132,16 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
Set<String> oldLogs = Sets.newHashSet();
// update the sub namespace cache
- Promise<Set<String>> newLogsPromise;
+ CompletableFuture<Set<String>> newLogsPromise;
synchronized (this) {
- if (logsFuture.isDefined()) { // the promise is already satisfied
+ if (logsFuture.isDone()) { // the promise is already satisfied
try {
oldLogs = FutureUtils.result(logsFuture);
- } catch (IOException e) {
+ } catch (Exception e) {
logger.error("Unexpected exception when getting logs from a satisified future of {} : ",
uri, e);
}
- logsFuture = new Promise<Set<String>>();
+ logsFuture = new CompletableFuture<Set<String>>();
}
// update the reverse cache
@@ -163,7 +161,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
}
newLogsPromise = logsFuture;
}
- newLogsPromise.setValue(newLogs);
+ newLogsPromise.complete(newLogs);
// notify namespace changes
notifyOnNamespaceChanges();
@@ -203,7 +201,16 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
this.maxLogsPerSubnamespace = conf.getFederatedMaxLogsPerSubnamespace();
// fetch the sub namespace
- Set<URI> uris = FutureUtils.result(fetchSubNamespaces(this));
+ Set<URI> uris;
+ try {
+ uris = FutureUtils.result(fetchSubNamespaces(this));
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ } else {
+ throw new IOException(e);
+ }
+ }
for (URI uri : uris) {
SubNamespace subNs = new SubNamespace(uri);
if (null == subNamespaces.putIfAbsent(uri, subNs)) {
@@ -228,21 +235,21 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
}
}
- private <T> Future<T> postStateCheck(Future<T> future) {
- final Promise<T> postCheckedPromise = new Promise<T>();
- future.addEventListener(new FutureEventListener<T>() {
+ private <T> CompletableFuture<T> postStateCheck(CompletableFuture<T> future) {
+ final CompletableFuture<T> postCheckedPromise = new CompletableFuture<T>();
+ future.whenComplete(new FutureEventListener<T>() {
@Override
public void onSuccess(T value) {
if (duplicatedLogFound.get()) {
- postCheckedPromise.setException(new UnexpectedException("Duplicate log found under " + namespace));
+ postCheckedPromise.completeExceptionally(new UnexpectedException("Duplicate log found under " + namespace));
} else {
- postCheckedPromise.setValue(value);
+ postCheckedPromise.complete(value);
}
}
@Override
public void onFailure(Throwable cause) {
- postCheckedPromise.setException(cause);
+ postCheckedPromise.completeExceptionally(cause);
}
});
return postCheckedPromise;
@@ -273,13 +280,13 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
namespace.getFragment());
}
- Future<Set<URI>> getCachedSubNamespaces() {
+ CompletableFuture<Set<URI>> getCachedSubNamespaces() {
Set<URI> nsSet = subNamespaces.keySet();
- return Future.value(nsSet);
+ return FutureUtils.value(nsSet);
}
- Future<Set<URI>> fetchSubNamespaces(final Watcher watcher) {
- final Promise<Set<URI>> promise = new Promise<Set<URI>>();
+ CompletableFuture<Set<URI>> fetchSubNamespaces(final Watcher watcher) {
+ final CompletableFuture<Set<URI>> promise = new CompletableFuture<Set<URI>>();
try {
zkc.get().sync(this.zkSubnamespacesPath, new AsyncCallback.VoidCallback() {
@Override
@@ -287,27 +294,27 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
if (Code.OK.intValue() == rc) {
fetchSubNamespaces(watcher, promise);
} else {
- promise.setException(KeeperException.create(Code.get(rc)));
+ promise.completeExceptionally(KeeperException.create(Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
return promise;
}
private void fetchSubNamespaces(Watcher watcher,
- final Promise<Set<URI>> promise) {
+ final CompletableFuture<Set<URI>> promise) {
try {
zkc.get().getChildren(this.zkSubnamespacesPath, watcher,
new AsyncCallback.Children2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
if (Code.NONODE.intValue() == rc) {
- promise.setException(new UnexpectedException(
+ promise.completeExceptionally(new UnexpectedException(
"The subnamespaces don't exist for the federated namespace " + namespace));
} else if (Code.OK.intValue() == rc) {
Set<URI> subnamespaces = Sets.newHashSet();
@@ -318,26 +325,26 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
}
} catch (URISyntaxException use) {
logger.error("Invalid sub namespace uri found : ", use);
- promise.setException(new UnexpectedException(
+ promise.completeExceptionally(new UnexpectedException(
"Invalid sub namespace uri found in " + namespace, use));
return;
}
// update the sub namespaces set before update version
setZkSubnamespacesVersion(stat.getVersion());
- promise.setValue(subnamespaces);
+ promise.complete(subnamespaces);
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
}
@Override
public void run() {
- fetchSubNamespaces(this).addEventListener(this);
+ fetchSubNamespaces(this).whenComplete(this);
}
@Override
@@ -370,7 +377,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
}
if (Event.EventType.NodeChildrenChanged == watchedEvent.getType()) {
// fetch the namespace
- fetchSubNamespaces(this).addEventListener(this);
+ fetchSubNamespaces(this).whenComplete(this);
}
}
@@ -378,27 +385,27 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
// Log Related Methods
//
- private <A> Future<A> duplicatedLogException(String logName) {
- return Future.exception(new UnexpectedException("Duplicated log " + logName
+ private <A> CompletableFuture<A> duplicatedLogException(String logName) {
+ return FutureUtils.exception(new UnexpectedException("Duplicated log " + logName
+ " found in namespace " + namespace));
}
@Override
- public Future<URI> createLog(final String logName) {
+ public CompletableFuture<URI> createLog(final String logName) {
if (duplicatedLogFound.get()) {
return duplicatedLogException(duplicatedLogName.get());
}
- Promise<URI> createPromise = new Promise<URI>();
+ CompletableFuture<URI> createPromise = new CompletableFuture<URI>();
doCreateLog(logName, createPromise);
return postStateCheck(createPromise);
}
- void doCreateLog(final String logName, final Promise<URI> createPromise) {
- getLogLocation(logName).addEventListener(new FutureEventListener<Optional<URI>>() {
+ void doCreateLog(final String logName, final CompletableFuture<URI> createPromise) {
+ getLogLocation(logName).whenComplete(new FutureEventListener<Optional<URI>>() {
@Override
public void onSuccess(Optional<URI> uriOptional) {
if (uriOptional.isPresent()) {
- createPromise.setException(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get()));
+ createPromise.completeExceptionally(new LogExistsException("Log " + logName + " already exists in " + uriOptional.get()));
} else {
getCachedSubNamespacesAndCreateLog(logName, createPromise);
}
@@ -406,14 +413,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
@Override
public void onFailure(Throwable cause) {
- createPromise.setException(cause);
+ createPromise.completeExceptionally(cause);
}
});
}
private void getCachedSubNamespacesAndCreateLog(final String logName,
- final Promise<URI> createPromise) {
- getCachedSubNamespaces().addEventListener(new FutureEventListener<Set<URI>>() {
+ final CompletableFuture<URI> createPromise) {
+ getCachedSubNamespaces().whenComplete(new FutureEventListener<Set<URI>>() {
@Override
public void onSuccess(Set<URI> uris) {
findSubNamespaceToCreateLog(logName, uris, createPromise);
@@ -421,14 +428,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
@Override
public void onFailure(Throwable cause) {
- createPromise.setException(cause);
+ createPromise.completeExceptionally(cause);
}
});
}
private void fetchSubNamespacesAndCreateLog(final String logName,
- final Promise<URI> createPromise) {
- fetchSubNamespaces(null).addEventListener(new FutureEventListener<Set<URI>>() {
+ final CompletableFuture<URI> createPromise) {
+ fetchSubNamespaces(null).whenComplete(new FutureEventListener<Set<URI>>() {
@Override
public void onSuccess(Set<URI> uris) {
findSubNamespaceToCreateLog(logName, uris, createPromise);
@@ -436,26 +443,26 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
@Override
public void onFailure(Throwable cause) {
- createPromise.setException(cause);
+ createPromise.completeExceptionally(cause);
}
});
}
private void findSubNamespaceToCreateLog(final String logName,
final Set<URI> uris,
- final Promise<URI> createPromise) {
+ final CompletableFuture<URI> createPromise) {
final List<URI> uriList = Lists.newArrayListWithExpectedSize(uris.size());
- List<Future<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size());
+ List<CompletableFuture<Set<String>>> futureList = Lists.newArrayListWithExpectedSize(uris.size());
for (URI uri : uris) {
SubNamespace subNs = subNamespaces.get(uri);
if (null == subNs) {
- createPromise.setException(new UnexpectedException("No sub namespace " + uri + " found"));
+ createPromise.completeExceptionally(new UnexpectedException("No sub namespace " + uri + " found"));
return;
}
futureList.add(subNs.getLogs());
uriList.add(uri);
}
- Future.collect(futureList).addEventListener(new FutureEventListener<List<Set<String>>>() {
+ FutureUtils.collect(futureList).whenComplete(new FutureEventListener<List<Set<String>>>() {
@Override
public void onSuccess(List<Set<String>> resultList) {
for (int i = resultList.size() - 1; i >= 0; i--) {
@@ -467,7 +474,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
}
}
// All sub namespaces are full
- createSubNamespace().addEventListener(new FutureEventListener<URI>() {
+ createSubNamespace().whenComplete(new FutureEventListener<URI>() {
@Override
public void onSuccess(URI uri) {
// the new namespace will be propagated to the namespace cache by the namespace listener
@@ -479,14 +486,14 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
@Override
public void onFailure(Throwable cause) {
- createPromise.setException(cause);
+ createPromise.completeExceptionally(cause);
}
});
}
@Override
public void onFailure(Throwable cause) {
- createPromise.setException(cause);
+ createPromise.completeExceptionally(cause);
}
});
}
@@ -499,8 +506,8 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
return SUB_NAMESPACE_PREFIX + parts[parts.length - 1];
}
- Future<URI> createSubNamespace() {
- final Promise<URI> promise = new Promise<URI>();
+ CompletableFuture<URI> createSubNamespace() {
+ final CompletableFuture<URI> promise = new CompletableFuture<URI>();
final String nsPath = namespace.getPath() + "/" + ZNODE_SUB_NAMESPACES + "/" + SUB_NAMESPACE_PREFIX;
try {
@@ -512,21 +519,21 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
try {
URI newUri = getSubNamespaceURI(getNamespaceFromZkPath(name));
logger.info("Created sub namespace {}", newUri);
- promise.setValue(newUri);
+ promise.complete(newUri);
} catch (UnexpectedException ue) {
- promise.setException(ue);
+ promise.completeExceptionally(ue);
} catch (URISyntaxException e) {
- promise.setException(new UnexpectedException("Invalid namespace " + name + " is created."));
+ promise.completeExceptionally(new UnexpectedException("Invalid namespace " + name + " is created."));
}
} else {
- promise.setException(KeeperException.create(Code.get(rc)));
+ promise.completeExceptionally(KeeperException.create(Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
} catch (InterruptedException e) {
- promise.setException(e);
+ promise.completeExceptionally(e);
}
return promise;
@@ -545,22 +552,22 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
*/
private void createLogInNamespace(final URI uri,
final String logName,
- final Promise<URI> createPromise) {
+ final CompletableFuture<URI> createPromise) {
// TODO: rewrite this after we bump to zk 3.5, where we will have asynchronous version of multi
scheduler.submit(new Runnable() {
@Override
public void run() {
try {
createLogInNamespaceSync(uri, logName);
- createPromise.setValue(uri);
+ createPromise.complete(uri);
} catch (InterruptedException e) {
- createPromise.setException(e);
+ createPromise.completeExceptionally(e);
} catch (IOException e) {
- createPromise.setException(e);
+ createPromise.completeExceptionally(e);
} catch (KeeperException.BadVersionException bve) {
fetchSubNamespacesAndCreateLog(logName, createPromise);
} catch (KeeperException e) {
- createPromise.setException(e);
+ createPromise.completeExceptionally(e);
}
}
});
@@ -617,39 +624,35 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
}
@Override
- public Future<Optional<URI>> getLogLocation(final String logName) {
+ public CompletableFuture<Optional<URI>> getLogLocation(final String logName) {
if (duplicatedLogFound.get()) {
return duplicatedLogException(duplicatedLogName.get());
}
URI location = log2Locations.get(logName);
if (null != location) {
- return postStateCheck(Future.value(Optional.of(location)));
+ return postStateCheck(FutureUtils.value(Optional.of(location)));
}
if (!forceCheckLogExistence) {
Optional<URI> result = Optional.absent();
- return Future.value(result);
+ return FutureUtils.value(result);
}
- return postStateCheck(fetchLogLocation(logName).onSuccess(
- new AbstractFunction1<Optional<URI>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Optional<URI> uriOptional) {
- if (uriOptional.isPresent()) {
- log2Locations.putIfAbsent(logName, uriOptional.get());
- }
- return BoxedUnit.UNIT;
- }
- }));
+ return postStateCheck(fetchLogLocation(logName).thenApply((uriOptional) -> {
+ if (uriOptional.isPresent()) {
+ log2Locations.putIfAbsent(logName, uriOptional.get());
+ }
+ return uriOptional;
+ }));
}
- private Future<Optional<URI>> fetchLogLocation(final String logName) {
- final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>();
+ private CompletableFuture<Optional<URI>> fetchLogLocation(final String logName) {
+ final CompletableFuture<Optional<URI>> fetchPromise = new CompletableFuture<Optional<URI>>();
Set<URI> uris = subNamespaces.keySet();
- List<Future<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size());
+ List<CompletableFuture<Optional<URI>>> fetchFutures = Lists.newArrayListWithExpectedSize(uris.size());
for (URI uri : uris) {
fetchFutures.add(fetchLogLocation(uri, logName));
}
- Future.collect(fetchFutures).addEventListener(new FutureEventListener<List<Optional<URI>>>() {
+ FutureUtils.collect(fetchFutures).whenComplete(new FutureEventListener<List<Optional<URI>>>() {
@Override
public void onSuccess(List<Optional<URI>> fetchResults) {
Optional<URI> result = Optional.absent();
@@ -660,7 +663,7 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
new Object[] { logName, result.get(), fetchResult.get() });
duplicatedLogName.compareAndSet(null, logName);
duplicatedLogFound.set(true);
- fetchPromise.setException(new UnexpectedException("Log " + logName
+ fetchPromise.completeExceptionally(new UnexpectedException("Log " + logName
+ " is found in multiple sub namespaces : "
+ result.get() + " & " + fetchResult.get()));
return;
@@ -669,62 +672,57 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
result = fetchResult;
}
}
- fetchPromise.setValue(result);
+ fetchPromise.complete(result);
}
@Override
public void onFailure(Throwable cause) {
- fetchPromise.setException(cause);
+ fetchPromise.completeExceptionally(cause);
}
});
return fetchPromise;
}
- private Future<Optional<URI>> fetchLogLocation(final URI uri, String logName) {
- final Promise<Optional<URI>> fetchPromise = new Promise<Optional<URI>>();
+ private CompletableFuture<Optional<URI>> fetchLogLocation(final URI uri, String logName) {
+ final CompletableFuture<Optional<URI>> fetchPromise = new CompletableFuture<Optional<URI>>();
final String logRootPath = uri.getPath() + "/" + logName;
try {
zkc.get().exists(logRootPath, false, new AsyncCallback.StatCallback() {
@Override
public void processResult(int rc, String path, Object ctx, Stat stat) {
if (Code.OK.intValue() == rc) {
- fetchPromise.setValue(Optional.of(uri));
+ fetchPromise.complete(Optional.of(uri));
} else if (Code.NONODE.intValue() == rc) {
- fetchPromise.setValue(Optional.<URI>absent());
+ fetchPromise.complete(Optional.<URI>absent());
} else {
- fetchPromise.setException(KeeperException.create(Code.get(rc)));
+ fetchPromise.completeExceptionally(KeeperException.create(Code.get(rc)));
}
}
}, null);
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- fetchPromise.setException(e);
+ fetchPromise.completeExceptionally(e);
} catch (InterruptedException e) {
- fetchPromise.setException(e);
+ fetchPromise.completeExceptionally(e);
}
return fetchPromise;
}
@Override
- public Future<Iterator<String>> getLogs() {
+ public CompletableFuture<Iterator<String>> getLogs() {
if (duplicatedLogFound.get()) {
return duplicatedLogException(duplicatedLogName.get());
}
- return postStateCheck(retrieveLogs().map(
- new AbstractFunction1<List<Set<String>>, Iterator<String>>() {
- @Override
- public Iterator<String> apply(List<Set<String>> resultList) {
- return getIterator(resultList);
- }
- }));
+ return postStateCheck(retrieveLogs().thenApply(
+ resultList -> getIterator(resultList)));
}
- private Future<List<Set<String>>> retrieveLogs() {
+ private CompletableFuture<List<Set<String>>> retrieveLogs() {
Collection<SubNamespace> subNss = subNamespaces.values();
- List<Future<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size());
+ List<CompletableFuture<Set<String>>> logsList = Lists.newArrayListWithExpectedSize(subNss.size());
for (SubNamespace subNs : subNss) {
logsList.add(subNs.getLogs());
}
- return Future.collect(logsList);
+ return FutureUtils.collect(logsList);
}
private Iterator<String> getIterator(List<Set<String>> resultList) {
@@ -747,13 +745,9 @@ public class FederatedZKLogMetadataStore extends NamespaceWatcher implements Log
}
private void notifyOnNamespaceChanges() {
- retrieveLogs().onSuccess(new AbstractFunction1<List<Set<String>>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(List<Set<String>> resultList) {
- for (NamespaceListener listener : listeners) {
- listener.onStreamsChanged(getIterator(resultList));
- }
- return BoxedUnit.UNIT;
+ retrieveLogs().thenAccept(resultList -> {
+ for (NamespaceListener listener : listeners) {
+ listener.onStreamsChanged(getIterator(resultList));
}
});
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
index 8f9913e..e45c755 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentAllocator.java
@@ -17,25 +17,23 @@
*/
package org.apache.distributedlog.impl.logsegment;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import org.apache.distributedlog.bk.LedgerAllocator;
import org.apache.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.distributedlog.util.Allocator;
import org.apache.distributedlog.util.Transaction;
-import com.twitter.util.Future;
import org.apache.bookkeeper.client.LedgerHandle;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-
-import java.io.IOException;
/**
* Allocate log segments
*/
class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object> {
- private static class NewLogSegmentEntryWriterFn extends AbstractFunction1<LedgerHandle, LogSegmentEntryWriter> {
+ private static class NewLogSegmentEntryWriterFn implements Function<LedgerHandle, LogSegmentEntryWriter> {
- static final Function1<LedgerHandle, LogSegmentEntryWriter> INSTANCE =
+ static final Function<LedgerHandle, LogSegmentEntryWriter> INSTANCE =
new NewLogSegmentEntryWriterFn();
private NewLogSegmentEntryWriterFn() {}
@@ -58,8 +56,8 @@ class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object>
}
@Override
- public Future<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn,
- final Transaction.OpListener<LogSegmentEntryWriter> listener) {
+ public CompletableFuture<LogSegmentEntryWriter> tryObtain(Transaction<Object> txn,
+ final Transaction.OpListener<LogSegmentEntryWriter> listener) {
return allocator.tryObtain(txn, new Transaction.OpListener<LedgerHandle>() {
@Override
public void onCommit(LedgerHandle lh) {
@@ -70,16 +68,16 @@ class BKLogSegmentAllocator implements Allocator<LogSegmentEntryWriter, Object>
public void onAbort(Throwable t) {
listener.onAbort(t);
}
- }).map(NewLogSegmentEntryWriterFn.INSTANCE);
+ }).thenApply(NewLogSegmentEntryWriterFn.INSTANCE);
}
@Override
- public Future<Void> asyncClose() {
+ public CompletableFuture<Void> asyncClose() {
return allocator.asyncClose();
}
@Override
- public Future<Void> delete() {
+ public CompletableFuture<Void> delete() {
return allocator.delete();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
index 034b23e..0bb91ae 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/logsegment/BKLogSegmentEntryReader.java
@@ -19,6 +19,7 @@ package org.apache.distributedlog.impl.logsegment;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.Entry;
import org.apache.distributedlog.LogSegmentMetadata;
@@ -29,10 +30,8 @@ import org.apache.distributedlog.exceptions.EndOfLogSegmentException;
import org.apache.distributedlog.exceptions.ReadCancelledException;
import org.apache.distributedlog.injector.AsyncFailureInjector;
import org.apache.distributedlog.logsegment.LogSegmentEntryReader;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
@@ -87,7 +86,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
return done;
}
- void setValue(LedgerEntry entry) {
+ void complete(LedgerEntry entry) {
synchronized (this) {
if (done) {
return;
@@ -98,7 +97,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
setDone(true);
}
- void setException(int rc) {
+ void completeExceptionally(int rc) {
synchronized (this) {
if (done) {
return;
@@ -152,16 +151,16 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
while (entries.hasMoreElements()) {
// more entries are returned
if (null != entry) {
- setException(BKException.Code.UnexpectedConditionException);
+ completeExceptionally(BKException.Code.UnexpectedConditionException);
return;
}
entry = entries.nextElement();
}
if (null == entry || entry.getEntryId() != entryId) {
- setException(BKException.Code.UnexpectedConditionException);
+ completeExceptionally(BKException.Code.UnexpectedConditionException);
return;
}
- setValue(entry);
+ complete(entry);
}
@Override
@@ -186,7 +185,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
return;
}
if (null != entry && this.entryId == entryId) {
- setValue(entry);
+ complete(entry);
return;
}
// the long poll is timeout or interrupted; we will retry it again.
@@ -215,7 +214,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
nextReadBackoffTime,
TimeUnit.MILLISECONDS);
} else {
- setException(rc);
+ completeExceptionally(rc);
}
return false;
}
@@ -229,7 +228,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
private class PendingReadRequest {
private final int numEntries;
private final List<Entry.Reader> entries;
- private final Promise<List<Entry.Reader>> promise;
+ private final CompletableFuture<List<Entry.Reader>> promise;
PendingReadRequest(int numEntries) {
this.numEntries = numEntries;
@@ -238,15 +237,15 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
} else {
this.entries = new ArrayList<Entry.Reader>();
}
- this.promise = new Promise<List<Entry.Reader>>();
+ this.promise = new CompletableFuture<List<Entry.Reader>>();
}
- Promise<List<Entry.Reader>> getPromise() {
+ CompletableFuture<List<Entry.Reader>> getPromise() {
return promise;
}
- void setException(Throwable throwable) {
- FutureUtils.setException(promise, throwable);
+ void completeExceptionally(Throwable throwable) {
+ FutureUtils.completeExceptionally(promise, throwable);
}
void addEntry(Entry.Reader entry) {
@@ -254,7 +253,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
}
void complete() {
- FutureUtils.setValue(promise, entries);
+ FutureUtils.complete(promise, entries);
onEntriesConsumed(entries.size());
}
@@ -277,7 +276,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
private final int numPrefetchEntries;
private final int maxPrefetchEntries;
// state
- private Promise<Void> closePromise = null;
+ private CompletableFuture<Void> closePromise = null;
private LogSegmentMetadata metadata;
private LedgerHandle lh;
private final List<LedgerHandle> openLedgerHandles;
@@ -457,7 +456,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
if (isBeyondLastAddConfirmed()) {
// if the reader is already caught up, let's fail the reader immediately
// as we need to pull the latest metadata of this log segment.
- setException(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), rc),
+ completeExceptionally(new BKTransmitException("Failed to open ledger for reading log segment " + getSegment(), rc),
true);
return;
}
@@ -488,7 +487,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
* @param throwable exception indicating the error
* @param isBackground is the reader set exception by background reads or foreground reads
*/
- private void setException(Throwable throwable, boolean isBackground) {
+ private void completeExceptionally(Throwable throwable, boolean isBackground) {
lastException.compareAndSet(null, throwable);
if (isBackground) {
notifyReaders();
@@ -510,7 +509,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
readQueue.clear();
}
for (PendingReadRequest request : requestsToCancel) {
- request.setException(throwExc);
+ request.completeExceptionally(throwExc);
}
}
@@ -630,11 +629,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
}
@Override
- public Future<List<Entry.Reader>> readNext(int numEntries) {
+ public CompletableFuture<List<Entry.Reader>> readNext(int numEntries) {
final PendingReadRequest readRequest = new PendingReadRequest(numEntries);
if (checkClosedOrInError()) {
- readRequest.setException(lastException.get());
+ readRequest.completeExceptionally(lastException.get());
} else {
boolean wasQueueEmpty;
synchronized (readQueue) {
@@ -682,9 +681,9 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
// mark the reader in error and abort all pending reads since
// we don't know the last consumed read
if (null == lastException.get()) {
- if (nextRequest.getPromise().isInterrupted().isDefined()) {
- setException(new DLInterruptedException("Interrupted on reading log segment "
- + getSegment() + " : " + nextRequest.getPromise().isInterrupted().get()), false);
+ if (nextRequest.getPromise().isCancelled()) {
+ completeExceptionally(new DLInterruptedException("Interrupted on reading log segment "
+ + getSegment() + " : " + nextRequest.getPromise().isCancelled()), false);
}
}
@@ -707,11 +706,11 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
} else {
DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from "
+ getSegment());
- nextRequest.setException(ise);
+ nextRequest.completeExceptionally(ise);
if (null != request) {
- request.setException(ise);
+ request.completeExceptionally(ise);
}
- setException(ise, false);
+ completeExceptionally(ise, false);
}
} else {
if (0 == scheduleCountLocal) {
@@ -732,7 +731,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
}
// reach end of log segment
if (hitEndOfLogSegment) {
- setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
+ completeExceptionally(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
return;
}
if (null == entry) {
@@ -742,7 +741,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
if (!entry.isDone()) {
// we already reached end of the log segment
if (isEndOfLogSegment(entry.getEntryId())) {
- setException(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
+ completeExceptionally(new EndOfLogSegmentException(getSegment().getZNodeName()), false);
}
return;
}
@@ -751,13 +750,13 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
if (entry != removedEntry) {
DLIllegalStateException ise = new DLIllegalStateException("Unexpected condition at reading from "
+ getSegment());
- setException(ise, false);
+ completeExceptionally(ise, false);
return;
}
try {
nextRequest.addEntry(processReadEntry(entry.getEntry()));
} catch (IOException e) {
- setException(e, false);
+ completeExceptionally(e, false);
return;
}
} else if (skipBrokenEntries && BKException.Code.DigestMatchException == entry.getRc()) {
@@ -766,7 +765,7 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
readAheadEntries.poll();
continue;
} else {
- setException(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId()
+ completeExceptionally(new BKTransmitException("Encountered issue on reading entry " + entry.getEntryId()
+ " @ log segment " + getSegment(), entry.getRc()), false);
return;
}
@@ -812,26 +811,29 @@ public class BKLogSegmentEntryReader implements Runnable, LogSegmentEntryReader,
}
@Override
- public Future<Void> asyncClose() {
- final Promise<Void> closeFuture;
+ public CompletableFuture<Void> asyncClose() {
+ final CompletableFuture<Void> closeFuture;
ReadCancelledException exception;
LedgerHandle[] lhsToClose;
synchronized (this) {
if (null != closePromise) {
return closePromise;
}
- closeFuture = closePromise = new Promise<Void>();
+ closeFuture = closePromise = new CompletableFuture<Void>();
lhsToClose = openLedgerHandles.toArray(new LedgerHandle[openLedgerHandles.size()]);
// set the exception to cancel pending and subsequent reads
exception = new ReadCancelledException(getSegment().getZNodeName(), "Reader was closed");
- setException(exception, false);
+ completeExceptionally(exception, false);
}
// cancel all pending reads
cancelAllPendingReads(exception);
// close all the open ledger
- BKUtils.closeLedgers(lhsToClose).proxyTo(closeFuture);
+ FutureUtils.proxyTo(
+ BKUtils.closeLedgers(lhsToClose),
+ closeFuture
+ );
return closeFuture;
}
}