You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:46 UTC
[14/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
new file mode 100644
index 0000000..45dc021
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/NamespaceBuilder.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api.namespace;
+
+import com.google.common.base.Preconditions;
+import org.apache.distributedlog.BKDistributedLogNamespace;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
+import org.apache.distributedlog.feature.CoreFeatureKeys;
+import org.apache.distributedlog.injector.AsyncFailureInjector;
+import org.apache.distributedlog.injector.AsyncRandomFailureInjector;
+import org.apache.distributedlog.namespace.NamespaceDriver;
+import org.apache.distributedlog.namespace.NamespaceDriverManager;
+import org.apache.distributedlog.util.ConfUtils;
+import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.common.util.PermitLimiter;
+import org.apache.distributedlog.util.SimplePermitLimiter;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.feature.FeatureProvider;
+import org.apache.bookkeeper.feature.SettableFeatureProvider;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+/**
+ * Builder to construct a <code>Namespace</code>.
+ * The builder takes the responsibility of loading backend according to the uri.
+ *
+ * @see Namespace
+ * @since 0.3.32
+ */
+public class NamespaceBuilder {
+
+ private static final Logger logger = LoggerFactory.getLogger(NamespaceBuilder.class);
+
+ public static NamespaceBuilder newBuilder() {
+ return new NamespaceBuilder();
+ }
+
+ private DistributedLogConfiguration _conf = null;
+ private DynamicDistributedLogConfiguration _dynConf = null;
+ private URI _uri = null;
+ private StatsLogger _statsLogger = NullStatsLogger.INSTANCE;
+ private StatsLogger _perLogStatsLogger = NullStatsLogger.INSTANCE;
+ private FeatureProvider _featureProvider = null;
+ private String _clientId = DistributedLogConstants.UNKNOWN_CLIENT_ID;
+ private int _regionId = DistributedLogConstants.LOCAL_REGION_ID;
+
+ // private constructor
+ private NamespaceBuilder() {}
+
+ /**
+ * DistributedLog Configuration used for the namespace.
+ *
+ * @param conf
+ * distributedlog configuration
+ * @return namespace builder.
+ */
+ public NamespaceBuilder conf(DistributedLogConfiguration conf) {
+ this._conf = conf;
+ return this;
+ }
+
+ /**
+ * Dynamic DistributedLog Configuration used for the namespace
+ *
+ * @param dynConf dynamic distributedlog configuration
+ * @return namespace builder
+ */
+ public NamespaceBuilder dynConf(DynamicDistributedLogConfiguration dynConf) {
+ this._dynConf = dynConf;
+ return this;
+ }
+
+ /**
+ * Namespace Location.
+ *
+ * @param uri
+ * namespace location uri.
+ * @see Namespace
+ * @return namespace builder.
+ */
+ public NamespaceBuilder uri(URI uri) {
+ this._uri = uri;
+ return this;
+ }
+
+ /**
+ * Stats Logger used for stats collection
+ *
+ * @param statsLogger
+ * stats logger
+ * @return namespace builder.
+ */
+ public NamespaceBuilder statsLogger(StatsLogger statsLogger) {
+ this._statsLogger = statsLogger;
+ return this;
+ }
+
+ /**
+ * Stats Logger used for collecting per log stats.
+ *
+ * @param statsLogger
+ * stats logger for collecting per log stats
+ * @return namespace builder.
+ */
+ public NamespaceBuilder perLogStatsLogger(StatsLogger statsLogger) {
+ this._perLogStatsLogger = statsLogger;
+ return this;
+ }
+
+ /**
+ * Feature provider used to control the availabilities of features in the namespace.
+ *
+ * @param featureProvider
+ * feature provider to control availabilities of features.
+ * @return namespace builder.
+ */
+ public NamespaceBuilder featureProvider(FeatureProvider featureProvider) {
+ this._featureProvider = featureProvider;
+ return this;
+ }
+
+ /**
+ * Client Id used for accessing the namespace
+ *
+ * @param clientId
+ * client id used for accessing the namespace
+ * @return namespace builder.
+ */
+ public NamespaceBuilder clientId(String clientId) {
+ this._clientId = clientId;
+ return this;
+ }
+
+ /**
+ * Region Id used for encoding logs in the namespace. The region id
+ * is useful when the namespace is globally spanning over regions.
+ *
+ * @param regionId
+ * region id.
+ * @return namespace builder.
+ */
+ public NamespaceBuilder regionId(int regionId) {
+ this._regionId = regionId;
+ return this;
+ }
+
+ @SuppressWarnings("deprecation")
+ private static StatsLogger normalizePerLogStatsLogger(StatsLogger statsLogger,
+ StatsLogger perLogStatsLogger,
+ DistributedLogConfiguration conf) {
+ StatsLogger normalizedPerLogStatsLogger = perLogStatsLogger;
+ if (perLogStatsLogger == NullStatsLogger.INSTANCE &&
+ conf.getEnablePerStreamStat()) {
+ normalizedPerLogStatsLogger = statsLogger.scope("stream");
+ }
+ return normalizedPerLogStatsLogger;
+ }
+
+ /**
+ * Build the namespace.
+ *
+ * @return the namespace instance.
+ * @throws IllegalArgumentException when there is illegal argument provided in the builder
+ * @throws NullPointerException when there is null argument provided in the builder
+ * @throws IOException when fail to build the backend
+ */
+ public Namespace build()
+ throws IllegalArgumentException, NullPointerException, IOException {
+ // Check arguments
+ Preconditions.checkNotNull(_conf, "No DistributedLog Configuration.");
+ Preconditions.checkNotNull(_uri, "No DistributedLog URI");
+
+ // validate the configuration
+ _conf.validate();
+ if (null == _dynConf) {
+ _dynConf = ConfUtils.getConstDynConf(_conf);
+ }
+
+ // retrieve the namespace driver
+ NamespaceDriver driver = NamespaceDriverManager.getDriver(_uri);
+ URI normalizedUri = DLUtils.normalizeURI(_uri);
+
+ // build the feature provider
+ FeatureProvider featureProvider;
+ if (null == _featureProvider) {
+ featureProvider = new SettableFeatureProvider("", 0);
+ logger.info("No feature provider is set. All features are disabled now.");
+ } else {
+ featureProvider = _featureProvider;
+ }
+
+ // build the failure injector
+ AsyncFailureInjector failureInjector = AsyncRandomFailureInjector.newBuilder()
+ .injectDelays(_conf.getEIInjectReadAheadDelay(),
+ _conf.getEIInjectReadAheadDelayPercent(),
+ _conf.getEIInjectMaxReadAheadDelayMs())
+ .injectErrors(false, 10)
+ .injectStops(_conf.getEIInjectReadAheadStall(), 10)
+ .injectCorruption(_conf.getEIInjectReadAheadBrokenEntries())
+ .build();
+
+ // normalize the per log stats logger
+ StatsLogger perLogStatsLogger = normalizePerLogStatsLogger(_statsLogger, _perLogStatsLogger, _conf);
+
+ // build the scheduler
+ OrderedScheduler scheduler = OrderedScheduler.newBuilder()
+ .name("DLM-" + normalizedUri.getPath())
+ .corePoolSize(_conf.getNumWorkerThreads())
+ .build();
+
+ // initialize the namespace driver
+ driver.initialize(
+ _conf,
+ _dynConf,
+ normalizedUri,
+ scheduler,
+ featureProvider,
+ failureInjector,
+ _statsLogger,
+ perLogStatsLogger,
+ DLUtils.normalizeClientId(_clientId),
+ _regionId);
+
+ // initialize the write limiter
+ PermitLimiter writeLimiter;
+ if (_conf.getGlobalOutstandingWriteLimit() < 0) {
+ writeLimiter = PermitLimiter.NULL_PERMIT_LIMITER;
+ } else {
+ Feature disableWriteLimitFeature = featureProvider.getFeature(
+ CoreFeatureKeys.DISABLE_WRITE_LIMIT.name().toLowerCase());
+ writeLimiter = new SimplePermitLimiter(
+ _conf.getOutstandingWriteLimitDarkmode(),
+ _conf.getGlobalOutstandingWriteLimit(),
+ _statsLogger.scope("writeLimiter"),
+ true /* singleton */,
+ disableWriteLimitFeature);
+ }
+
+ return new BKDistributedLogNamespace(
+ _conf,
+ normalizedUri,
+ driver,
+ scheduler,
+ featureProvider,
+ writeLimiter,
+ failureInjector,
+ _statsLogger,
+ perLogStatsLogger,
+ DLUtils.normalizeClientId(_clientId),
+ _regionId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java
new file mode 100644
index 0000000..fa8f288
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Namespace API and the builder to build namespace instance.
+ */
+package org.apache.distributedlog.api.namespace;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java
new file mode 100644
index 0000000..eca11fd
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/package-info.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * DistributedLog NEW API.
+ *
+ * <p>This is the new Java8 {@link java.util.concurrent.CompletableFuture} based API. It is
+ * <strong>experimental</strong> and still under developing.
+ */
+package org.apache.distributedlog.api;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java
new file mode 100644
index 0000000..bf4a8d3
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionStateStore.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api.subscription;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.DLSN;
+
+public interface SubscriptionStateStore extends Closeable {
+ /**
+ * Get the last committed position stored for this subscription
+ *
+ * @return future represents the last commit position
+ */
+ public CompletableFuture<DLSN> getLastCommitPosition();
+
+ /**
+ * Advances the position associated with the subscriber
+ *
+ * @param newPosition - new commit position
+ * @return future represents the advance result
+ */
+ public CompletableFuture<Void> advanceCommitPosition(DLSN newPosition);
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java
new file mode 100644
index 0000000..b6a0ed1
--- /dev/null
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/subscription/SubscriptionsStore.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.api.subscription;
+
+import java.io.Closeable;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.DLSN;
+
+/**
+ * Store to manage subscriptions
+ */
+public interface SubscriptionsStore extends Closeable {
+
+ /**
+ * Get the last committed position stored for <i>subscriberId</i>.
+ *
+ * @param subscriberId
+ * subscriber id
+ * @return future representing last committed position.
+ */
+ public CompletableFuture<DLSN> getLastCommitPosition(String subscriberId);
+
+ /**
+ * Get the last committed positions for all subscribers.
+ *
+ * @return future representing last committed positions for all subscribers.
+ */
+ public CompletableFuture<Map<String, DLSN>> getLastCommitPositions();
+
+ /**
+ * Advance the last committed position for <i>subscriberId</i>.
+ *
+ * @param subscriberId
+ * subscriber id.
+ * @param newPosition
+ * new committed position.
+ * @return future representing advancing result.
+ */
+ public CompletableFuture<Void> advanceCommitPosition(String subscriberId, DLSN newPosition);
+
+ /**
+ * Delete the subscriber <i>subscriberId</i> permanently. Once the subscriber is deleted, all the
+ * data stored under this subscriber will be lost.
+ * @param subscriberId subscriber id
+ * @return future represent success or failure.
+ * return true only if there's such subscriber and we removed it successfully.
+ * return false if there's no such subscriber, or we failed to remove.
+ */
+ public CompletableFuture<Boolean> deleteSubscriber(String subscriberId);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java b/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
index 56a4f2e..34011b5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/auditor/DLAuditor.java
@@ -20,20 +20,20 @@ package org.apache.distributedlog.auditor;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.SettableFuture;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.LogSegmentMetadata;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.impl.BKNamespaceDriver;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.namespace.NamespaceDriver;
import org.apache.distributedlog.util.DLUtils;
import org.apache.bookkeeper.client.BKException;
@@ -45,6 +45,7 @@ import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.RetryPolicy;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
@@ -84,13 +85,13 @@ public class DLAuditor {
this.conf = conf;
}
- private ZooKeeperClient getZooKeeperClient(DistributedLogNamespace namespace) {
+ private ZooKeeperClient getZooKeeperClient(Namespace namespace) {
NamespaceDriver driver = namespace.getNamespaceDriver();
assert(driver instanceof BKNamespaceDriver);
return ((BKNamespaceDriver) driver).getWriterZKC();
}
- private BookKeeperClient getBookKeeperClient(DistributedLogNamespace namespace) {
+ private BookKeeperClient getBookKeeperClient(Namespace namespace) {
NamespaceDriver driver = namespace.getNamespaceDriver();
assert(driver instanceof BKNamespaceDriver);
return ((BKNamespaceDriver) driver).getReaderBKC();
@@ -169,7 +170,7 @@ public class DLAuditor {
LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
final Set<Long> ledgers = new HashSet<Long>();
- final SettableFuture<Void> doneFuture = SettableFuture.create();
+ final CompletableFuture<Void> doneFuture = FutureUtils.createFuture();
BookkeeperInternalCallbacks.Processor<Long> collector =
new BookkeeperInternalCallbacks.Processor<Long>() {
@@ -195,9 +196,9 @@ public class DLAuditor {
@Override
public void processResult(int rc, String path, Object ctx) {
if (BKException.Code.OK == rc) {
- doneFuture.set(null);
+ doneFuture.complete(null);
} else {
- doneFuture.setException(BKException.create(rc));
+ doneFuture.completeExceptionally(BKException.create(rc));
}
}
};
@@ -225,12 +226,12 @@ public class DLAuditor {
private Set<Long> collectLedgersFromDL(List<URI> uris, List<List<String>> allocationPaths)
throws IOException {
final Set<Long> ledgers = new TreeSet<Long>();
- List<DistributedLogNamespace> namespaces =
- new ArrayList<DistributedLogNamespace>(uris.size());
+ List<Namespace> namespaces =
+ new ArrayList<Namespace>(uris.size());
try {
for (URI uri : uris) {
namespaces.add(
- DistributedLogNamespaceBuilder.newBuilder()
+ NamespaceBuilder.newBuilder()
.conf(conf)
.uri(uri)
.build());
@@ -240,8 +241,8 @@ public class DLAuditor {
ExecutorService executor = Executors.newFixedThreadPool(uris.size());
try {
int i = 0;
- for (final DistributedLogNamespace namespace : namespaces) {
- final DistributedLogNamespace dlNamespace = namespace;
+ for (final Namespace namespace : namespaces) {
+ final Namespace dlNamespace = namespace;
final URI uri = uris.get(i);
final List<String> aps = allocationPaths.get(i);
i++;
@@ -278,7 +279,7 @@ public class DLAuditor {
executor.shutdown();
}
} finally {
- for (DistributedLogNamespace namespace : namespaces) {
+ for (Namespace namespace : namespaces) {
namespace.close();
}
}
@@ -286,7 +287,7 @@ public class DLAuditor {
}
private void collectLedgersFromAllocator(final URI uri,
- final DistributedLogNamespace namespace,
+ final Namespace namespace,
final List<String> allocationPaths,
final Set<Long> ledgers) throws IOException {
final LinkedBlockingQueue<String> poolQueue =
@@ -346,7 +347,7 @@ public class DLAuditor {
}
private void collectLedgersFromDL(final URI uri,
- final DistributedLogNamespace namespace,
+ final Namespace namespace,
final Set<Long> ledgers) throws IOException {
logger.info("Enumerating {} to collect streams.", uri);
Iterator<String> streams = namespace.getLogs();
@@ -366,7 +367,7 @@ public class DLAuditor {
});
}
- private List<Long> collectLedgersFromStream(DistributedLogNamespace namespace,
+ private List<Long> collectLedgersFromStream(Namespace namespace,
String stream,
Set<Long> ledgers)
throws IOException {
@@ -394,7 +395,7 @@ public class DLAuditor {
*/
public Map<String, Long> calculateStreamSpaceUsage(final URI uri) throws IOException {
logger.info("Collecting stream space usage for {}.", uri);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(conf)
.uri(uri)
.build();
@@ -406,7 +407,7 @@ public class DLAuditor {
}
private Map<String, Long> calculateStreamSpaceUsage(
- final URI uri, final DistributedLogNamespace namespace)
+ final URI uri, final Namespace namespace)
throws IOException {
Iterator<String> streams = namespace.getLogs();
final LinkedBlockingQueue<String> streamQueue = new LinkedBlockingQueue<String>();
@@ -432,7 +433,7 @@ public class DLAuditor {
return streamSpaceUsageMap;
}
- private long calculateStreamSpaceUsage(final DistributedLogNamespace namespace,
+ private long calculateStreamSpaceUsage(final Namespace namespace,
final String stream) throws IOException {
DistributedLogManager dlm = namespace.openLog(stream);
long totalBytes = 0;
@@ -504,7 +505,7 @@ public class DLAuditor {
LedgerManager lm = BookKeeperAccessor.getLedgerManager(bkc.get());
- final SettableFuture<Void> doneFuture = SettableFuture.create();
+ final CompletableFuture<Void> doneFuture = FutureUtils.createFuture();
final BookKeeper bk = bkc.get();
BookkeeperInternalCallbacks.Processor<Long> collector =
@@ -544,9 +545,9 @@ public class DLAuditor {
@Override
public void processResult(int rc, String path, Object ctx) {
if (BKException.Code.OK == rc) {
- doneFuture.set(null);
+ doneFuture.complete(null);
} else {
- doneFuture.setException(BKException.create(rc));
+ doneFuture.completeExceptionally(BKException.create(rc));
}
}
};
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java
index 6ea248b..ee33dc3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorDelegator.java
@@ -17,9 +17,10 @@
*/
package org.apache.distributedlog.bk;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Transaction.OpListener;
-import com.twitter.util.Future;
import org.apache.bookkeeper.client.LedgerHandle;
import java.io.IOException;
@@ -57,8 +58,8 @@ public class LedgerAllocatorDelegator implements LedgerAllocator {
}
@Override
- public Future<Void> delete() {
- return Future.exception(new UnsupportedOperationException("Can't delete an allocator by delegator"));
+ public CompletableFuture<Void> delete() {
+ return FutureUtils.exception(new UnsupportedOperationException("Can't delete an allocator by delegator"));
}
@Override
@@ -67,17 +68,17 @@ public class LedgerAllocatorDelegator implements LedgerAllocator {
}
@Override
- public Future<LedgerHandle> tryObtain(Transaction<Object> txn,
+ public CompletableFuture<LedgerHandle> tryObtain(Transaction<Object> txn,
OpListener<LedgerHandle> listener) {
return this.allocator.tryObtain(txn, listener);
}
@Override
- public Future<Void> asyncClose() {
+ public CompletableFuture<Void> asyncClose() {
if (ownAllocator) {
return this.allocator.asyncClose();
} else {
- return Future.value(null);
+ return FutureUtils.value(null);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
index 4fff2f6..19c5546 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/LedgerAllocatorPool.java
@@ -19,17 +19,15 @@ package org.apache.distributedlog.bk;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.exceptions.DLInterruptedException;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.util.ZkUtils;
@@ -40,7 +38,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
import java.io.IOException;
import java.util.ArrayList;
@@ -320,25 +317,25 @@ public class LedgerAllocatorPool implements LedgerAllocator {
}
@Override
- public Future<LedgerHandle> tryObtain(final Transaction<Object> txn,
- final Transaction.OpListener<LedgerHandle> listener) {
+ public CompletableFuture<LedgerHandle> tryObtain(final Transaction<Object> txn,
+ final Transaction.OpListener<LedgerHandle> listener) {
final SimpleLedgerAllocator allocator;
synchronized (this) {
if (allocatingList.isEmpty()) {
- return Future.exception(new IOException("No ledger allocator available under " + poolPath + "."));
+ return FutureUtils.exception(new IOException("No ledger allocator available under " + poolPath + "."));
} else {
allocator = allocatingList.removeFirst();
}
}
- final Promise<LedgerHandle> tryObtainPromise = new Promise<LedgerHandle>();
+ final CompletableFuture<LedgerHandle> tryObtainPromise = new CompletableFuture<LedgerHandle>();
final FutureEventListener<LedgerHandle> tryObtainListener = new FutureEventListener<LedgerHandle>() {
@Override
public void onSuccess(LedgerHandle lh) {
synchronized (LedgerAllocatorPool.this) {
obtainMap.put(lh, allocator);
reverseObtainMap.put(allocator, lh);
- tryObtainPromise.setValue(lh);
+ tryObtainPromise.complete(lh);
}
}
@@ -349,7 +346,7 @@ public class LedgerAllocatorPool implements LedgerAllocator {
} catch (IOException ioe) {
logger.info("Failed to rescue allocator {}", allocator.allocatePath, ioe);
}
- tryObtainPromise.setException(cause);
+ tryObtainPromise.completeExceptionally(cause);
}
};
@@ -365,7 +362,7 @@ public class LedgerAllocatorPool implements LedgerAllocator {
abortObtain(allocator);
listener.onAbort(t);
}
- }).addEventListener(tryObtainListener);
+ }).whenComplete(tryObtainListener);
return tryObtainPromise;
}
@@ -399,7 +396,7 @@ public class LedgerAllocatorPool implements LedgerAllocator {
}
@Override
- public Future<Void> asyncClose() {
+ public CompletableFuture<Void> asyncClose() {
List<LedgerAllocator> allocatorsToClose;
synchronized (this) {
allocatorsToClose = Lists.newArrayListWithExpectedSize(
@@ -414,21 +411,15 @@ public class LedgerAllocatorPool implements LedgerAllocator {
allocatorsToClose.add(allocator);
}
}
- return FutureUtils.processList(allocatorsToClose, new Function<LedgerAllocator, Future<Void>>() {
- @Override
- public Future<Void> apply(LedgerAllocator allocator) {
- return allocator.asyncClose();
- }
- }, scheduledExecutorService).map(new AbstractFunction1<List<Void>, Void>() {
- @Override
- public Void apply(List<Void> values) {
- return null;
- }
- });
+ return FutureUtils.processList(
+ allocatorsToClose,
+ allocator -> allocator.asyncClose(),
+ scheduledExecutorService
+ ).thenApply(values -> null);
}
@Override
- public Future<Void> delete() {
+ public CompletableFuture<Void> delete() {
List<LedgerAllocator> allocatorsToDelete;
synchronized (this) {
allocatorsToDelete = Lists.newArrayListWithExpectedSize(
@@ -443,16 +434,10 @@ public class LedgerAllocatorPool implements LedgerAllocator {
allocatorsToDelete.add(allocator);
}
}
- return FutureUtils.processList(allocatorsToDelete, new Function<LedgerAllocator, Future<Void>>() {
- @Override
- public Future<Void> apply(LedgerAllocator allocator) {
- return allocator.delete();
- }
- }, scheduledExecutorService).flatMap(new AbstractFunction1<List<Void>, Future<Void>>() {
- @Override
- public Future<Void> apply(List<Void> values) {
- return Utils.zkDelete(zkc, poolPath, new ZkVersion(-1));
- }
- });
+ return FutureUtils.processList(
+ allocatorsToDelete,
+ allocator -> allocator.delete(),
+ scheduledExecutorService
+ ).thenCompose(values -> Utils.zkDelete(zkc, poolPath, new ZkVersion(-1)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
index e0102f3..144b0a6 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/bk/SimpleLedgerAllocator.java
@@ -18,19 +18,20 @@
package org.apache.distributedlog.bk;
import com.google.common.collect.Lists;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.function.Function;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Transaction.OpListener;
import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.distributedlog.zk.ZKVersionedSetOp;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Version;
@@ -40,9 +41,6 @@ import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.util.LinkedList;
@@ -96,7 +94,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
// version
ZkVersion version = new ZkVersion(-1);
// outstanding allocation
- Promise<LedgerHandle> allocatePromise;
+ CompletableFuture<LedgerHandle> allocatePromise;
// outstanding tryObtain transaction
Transaction<Object> tryObtainTxn = null;
OpListener<LedgerHandle> tryObtainListener = null;
@@ -105,73 +103,71 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
// Allocated Ledger
LedgerHandle allocatedLh = null;
- Future<Void> closeFuture = null;
- final LinkedList<Future<Void>> ledgerDeletions =
- new LinkedList<Future<Void>>();
+ CompletableFuture<Void> closeFuture = null;
+ final LinkedList<CompletableFuture<Void>> ledgerDeletions =
+ new LinkedList<CompletableFuture<Void>>();
// Ledger configuration
private final QuorumConfigProvider quorumConfigProvider;
- static Future<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath,
+ static CompletableFuture<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath,
final ZooKeeperClient zkc) {
return Utils.zkGetData(zkc, allocatePath, false)
- .flatMap(new AbstractFunction1<Versioned<byte[]>, Future<Versioned<byte[]>>>() {
+ .thenCompose(new Function<Versioned<byte[]>, CompletionStage<Versioned<byte[]>>>() {
@Override
- public Future<Versioned<byte[]>> apply(Versioned<byte[]> result) {
+ public CompletableFuture<Versioned<byte[]>> apply(Versioned<byte[]> result) {
if (null != result && null != result.getVersion() && null != result.getValue()) {
- return Future.value(result);
+ return FutureUtils.value(result);
}
return createAllocationData(allocatePath, zkc);
}
});
}
- private static Future<Versioned<byte[]>> createAllocationData(final String allocatePath,
+ private static CompletableFuture<Versioned<byte[]>> createAllocationData(final String allocatePath,
final ZooKeeperClient zkc) {
try {
- final Promise<Versioned<byte[]>> promise = new Promise<Versioned<byte[]>>();
+ final CompletableFuture<Versioned<byte[]>> promise = new CompletableFuture<Versioned<byte[]>>();
zkc.get().create(allocatePath, DistributedLogConstants.EMPTY_BYTES,
zkc.getDefaultACL(), CreateMode.PERSISTENT,
new org.apache.zookeeper.AsyncCallback.Create2Callback() {
@Override
public void processResult(int rc, String path, Object ctx, String name, Stat stat) {
if (KeeperException.Code.OK.intValue() == rc) {
- promise.setValue(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
+ promise.complete(new Versioned<byte[]>(DistributedLogConstants.EMPTY_BYTES,
new ZkVersion(stat.getVersion())));
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
- Utils.zkGetData(zkc, allocatePath, false).proxyTo(promise);
+ FutureUtils.proxyTo(
+ Utils.zkGetData(zkc, allocatePath, false),
+ promise
+ );
} else {
- promise.setException(FutureUtils.zkException(
+ promise.completeExceptionally(Utils.zkException(
KeeperException.create(KeeperException.Code.get(rc)), allocatePath));
}
}
}, null);
return promise;
} catch (ZooKeeperClient.ZooKeeperConnectionException e) {
- return Future.exception(FutureUtils.zkException(e, allocatePath));
+ return FutureUtils.exception(Utils.zkException(e, allocatePath));
} catch (InterruptedException e) {
- return Future.exception(FutureUtils.zkException(e, allocatePath));
+ return FutureUtils.exception(Utils.zkException(e, allocatePath));
}
}
- public static Future<SimpleLedgerAllocator> of(final String allocatePath,
+ public static CompletableFuture<SimpleLedgerAllocator> of(final String allocatePath,
final Versioned<byte[]> allocationData,
final QuorumConfigProvider quorumConfigProvider,
final ZooKeeperClient zkc,
final BookKeeperClient bkc) {
if (null != allocationData && null != allocationData.getValue()
&& null != allocationData.getVersion()) {
- return Future.value(new SimpleLedgerAllocator(allocatePath, allocationData,
+ return FutureUtils.value(new SimpleLedgerAllocator(allocatePath, allocationData,
quorumConfigProvider, zkc, bkc));
}
return getAndCreateAllocationData(allocatePath, zkc)
- .map(new AbstractFunction1<Versioned<byte[]>, SimpleLedgerAllocator>() {
- @Override
- public SimpleLedgerAllocator apply(Versioned<byte[]> allocationData) {
- return new SimpleLedgerAllocator(allocatePath, allocationData,
- quorumConfigProvider, zkc, bkc);
- }
- });
+ .thenApply(allocationData1 -> new SimpleLedgerAllocator(allocatePath, allocationData1,
+ quorumConfigProvider, zkc, bkc));
}
/**
@@ -240,14 +236,14 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
}
@Override
- public synchronized Future<LedgerHandle> tryObtain(final Transaction<Object> txn,
- final OpListener<LedgerHandle> listener) {
+ public synchronized CompletableFuture<LedgerHandle> tryObtain(final Transaction<Object> txn,
+ final OpListener<LedgerHandle> listener) {
if (Phase.ERROR == phase) {
- return Future.exception(new AllocationException(Phase.ERROR,
+ return FutureUtils.exception(new AllocationException(Phase.ERROR,
"Error on allocating ledger under " + allocatePath));
}
if (Phase.HANDING_OVER == phase || Phase.HANDED_OVER == phase || null != tryObtainTxn) {
- return Future.exception(new ConcurrentObtainException(phase,
+ return FutureUtils.exception(new ConcurrentObtainException(phase,
"Ledger handle is handling over to another thread : " + phase));
}
tryObtainTxn = txn;
@@ -328,13 +324,13 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
return;
}
setPhase(Phase.ALLOCATING);
- allocatePromise = new Promise<LedgerHandle>();
+ allocatePromise = new CompletableFuture<LedgerHandle>();
QuorumConfig quorumConfig = quorumConfigProvider.getQuorumConfig();
bkc.createLedger(
quorumConfig.getEnsembleSize(),
quorumConfig.getWriteQuorumSize(),
quorumConfig.getAckQuorumSize()
- ).addEventListener(this);
+ ).whenComplete(this);
}
private synchronized void completeAllocation(LedgerHandle lh) {
@@ -347,11 +343,11 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
ZKVersionedSetOp commitOp = new ZKVersionedSetOp(zkSetDataOp, this);
tryObtainTxn.addOp(commitOp);
setPhase(Phase.HANDING_OVER);
- FutureUtils.setValue(allocatePromise, lh);
+ allocatePromise.complete(lh);
}
private synchronized void failAllocation(Throwable cause) {
- FutureUtils.setException(allocatePromise, cause);
+ allocatePromise.completeExceptionally(cause);
}
@Override
@@ -386,7 +382,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
private void markAsAllocated(final LedgerHandle lh) {
byte[] data = DLUtils.logSegmentId2Bytes(lh.getId());
Utils.zkSetData(zkc, allocatePath, data, getVersion())
- .addEventListener(new FutureEventListener<ZkVersion>() {
+ .whenComplete(new FutureEventListener<ZkVersion>() {
@Override
public void onSuccess(ZkVersion version) {
// we only issue deleting ledger left from previous allocation when we could allocate first ledger
@@ -411,27 +407,20 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
}
void deleteLedger(final long ledgerId) {
- final Future<Void> deleteFuture = bkc.deleteLedger(ledgerId, true);
+ final CompletableFuture<Void> deleteFuture = bkc.deleteLedger(ledgerId, true);
synchronized (ledgerDeletions) {
ledgerDeletions.add(deleteFuture);
}
- deleteFuture.onFailure(new AbstractFunction1<Throwable, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable cause) {
+ deleteFuture.whenComplete((value, cause) -> {
+ if (null != cause) {
LOG.error("Error deleting ledger {} for ledger allocator {}, retrying : ",
new Object[] { ledgerId, allocatePath, cause });
if (!isClosing()) {
deleteLedger(ledgerId);
}
- return BoxedUnit.UNIT;
}
- }).ensure(new AbstractFunction0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- synchronized (ledgerDeletions) {
- ledgerDeletions.remove(deleteFuture);
- }
- return BoxedUnit.UNIT;
+ synchronized (ledgerDeletions) {
+ ledgerDeletions.remove(deleteFuture);
}
});
}
@@ -440,25 +429,25 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
return closeFuture != null;
}
- private Future<Void> closeInternal(boolean cleanup) {
- Promise<Void> closePromise;
+ private CompletableFuture<Void> closeInternal(boolean cleanup) {
+ CompletableFuture<Void> closePromise;
synchronized (this) {
if (null != closeFuture) {
return closeFuture;
}
- closePromise = new Promise<Void>();
+ closePromise = new CompletableFuture<Void>();
closeFuture = closePromise;
}
if (!cleanup) {
LOG.info("Abort ledger allocator without cleaning up on {}.", allocatePath);
- FutureUtils.setValue(closePromise, null);
+ closePromise.complete(null);
return closePromise;
}
cleanupAndClose(closePromise);
return closePromise;
}
- private void cleanupAndClose(final Promise<Void> closePromise) {
+ private void cleanupAndClose(final CompletableFuture<Void> closePromise) {
LOG.info("Closing ledger allocator on {}.", allocatePath);
final ZKTransaction txn = new ZKTransaction(zkc);
// try obtain ledger handle
@@ -476,21 +465,21 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
}
private void complete() {
- FutureUtils.setValue(closePromise, null);
+ closePromise.complete(null);
LOG.info("Closed ledger allocator on {}.", allocatePath);
}
- }).addEventListener(new FutureEventListener<LedgerHandle>() {
+ }).whenComplete(new FutureEventListener<LedgerHandle>() {
@Override
public void onSuccess(LedgerHandle lh) {
// try obtain succeed
// if we could obtain the ledger handle, we have the responsibility to close it
deleteLedger(lh.getId());
// wait for deletion to be completed
- List<Future<Void>> outstandingDeletions;
+ List<CompletableFuture<Void>> outstandingDeletions;
synchronized (ledgerDeletions) {
outstandingDeletions = Lists.newArrayList(ledgerDeletions);
}
- Future.collect(outstandingDeletions).addEventListener(new FutureEventListener<List<Void>>() {
+ FutureUtils.collect(outstandingDeletions).whenComplete(new FutureEventListener<List<Void>>() {
@Override
public void onSuccess(List<Void> values) {
txn.execute();
@@ -499,7 +488,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
@Override
public void onFailure(Throwable cause) {
LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
- FutureUtils.setValue(closePromise, null);
+ closePromise.complete(null);
}
});
}
@@ -507,7 +496,7 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
@Override
public void onFailure(Throwable cause) {
LOG.debug("Fail to obtain the allocated ledger handle when closing the allocator : ", cause);
- FutureUtils.setValue(closePromise, null);
+ closePromise.complete(null);
}
});
@@ -519,18 +508,13 @@ public class SimpleLedgerAllocator implements LedgerAllocator, FutureEventListen
}
@Override
- public Future<Void> asyncClose() {
+ public CompletableFuture<Void> asyncClose() {
return closeInternal(false);
}
@Override
- public Future<Void> delete() {
- return closeInternal(true).flatMap(new AbstractFunction1<Void, Future<Void>>() {
- @Override
- public Future<Void> apply(Void value) {
- return Utils.zkDelete(zkc, allocatePath, getVersion());
- }
- });
+ public CompletableFuture<Void> delete() {
+ return closeInternal(true).thenCompose(value -> Utils.zkDelete(zkc, allocatePath, getVersion()));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java b/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java
deleted file mode 100644
index dccd2e8..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/callback/ReadAheadCallback.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.callback;
-
-/**
- * ReadAhead Callback
- */
-public interface ReadAheadCallback {
- void resumeReadAhead();
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java
deleted file mode 100644
index f189ad3..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentBaseConfiguration.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.commons.configuration.AbstractConfiguration;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Configuration view built on concurrent hash map for fast thread-safe access.
- * Notes:
- * 1. Multi-property list aggregation will not work in this class. I.e. commons config
- * normally combines all properties with the same key into one list property automatically.
- * This class simply overwrites any existing mapping.
- */
-public class ConcurrentBaseConfiguration extends AbstractConfiguration {
- static final Logger LOG = LoggerFactory.getLogger(ConcurrentBaseConfiguration.class);
-
- private final ConcurrentHashMap<String, Object> map;
-
- public ConcurrentBaseConfiguration() {
- this.map = new ConcurrentHashMap<String, Object>();
- }
-
- @Override
- protected void addPropertyDirect(String key, Object value) {
- Preconditions.checkNotNull(value);
- map.put(key, value);
- }
-
- @Override
- public Object getProperty(String key) {
- return map.get(key);
- }
-
- @Override
- public Iterator getKeys() {
- return map.keySet().iterator();
- }
-
- @Override
- public boolean containsKey(String key) {
- return map.containsKey(key);
- }
-
- @Override
- public boolean isEmpty() {
- return map.isEmpty();
- }
-
- @Override
- protected void clearPropertyDirect(String key) {
- map.remove(key);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java
deleted file mode 100644
index 4e7f886..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConcurrentConstConfiguration.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import com.google.common.base.Preconditions;
-import org.apache.commons.configuration.Configuration;
-
-/**
- * Invariant thread-safe view of some configuration.
- */
-public class ConcurrentConstConfiguration extends ConcurrentBaseConfiguration {
- public ConcurrentConstConfiguration(Configuration conf) {
- Preconditions.checkNotNull(conf);
- copy(conf);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java
deleted file mode 100644
index 70059d4..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-/**
- * Configuration listener triggered when reloading configuration settings.
- */
-public interface ConfigurationListener {
-
- /**
- * Reload the configuration.
- *
- * @param conf configuration to reload
- */
- void onReload(ConcurrentBaseConfiguration conf);
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java
deleted file mode 100644
index 0e5c897..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/ConfigurationSubscription.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import java.io.FileNotFoundException;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.Iterator;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Sets;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.FileConfiguration;
-import org.apache.commons.configuration.reloading.FileChangedReloadingStrategy;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * ConfigurationSubscription publishes a reloading, thread-safe view of file configuration. The class
- * periodically calls FileConfiguration.reload on the underlying conf, and propagates changes to the
- * concurrent config. The configured FileChangedReloadingStrategy ensures that file config will only
- * be reloaded if something changed.
- * Notes:
- * 1. Reload schedule is never terminated. The assumption is a finite number of these are started
- * at the calling layer, and terminated only once the executor service is shut down.
- * 2. The underlying FileConfiguration is not at all thread-safe, so its important to ensure access
- * to this object is always single threaded.
- */
-public class ConfigurationSubscription {
- static final Logger LOG = LoggerFactory.getLogger(ConfigurationSubscription.class);
-
- private final ConcurrentBaseConfiguration viewConfig;
- private final ScheduledExecutorService executorService;
- private final int reloadPeriod;
- private final TimeUnit reloadUnit;
- private final List<FileConfigurationBuilder> fileConfigBuilders;
- private final List<FileConfiguration> fileConfigs;
- private final CopyOnWriteArraySet<ConfigurationListener> confListeners;
-
- public ConfigurationSubscription(ConcurrentBaseConfiguration viewConfig,
- List<FileConfigurationBuilder> fileConfigBuilders,
- ScheduledExecutorService executorService,
- int reloadPeriod,
- TimeUnit reloadUnit)
- throws ConfigurationException {
- Preconditions.checkNotNull(fileConfigBuilders);
- Preconditions.checkArgument(!fileConfigBuilders.isEmpty());
- Preconditions.checkNotNull(executorService);
- Preconditions.checkNotNull(viewConfig);
- this.viewConfig = viewConfig;
- this.executorService = executorService;
- this.reloadPeriod = reloadPeriod;
- this.reloadUnit = reloadUnit;
- this.fileConfigBuilders = fileConfigBuilders;
- this.fileConfigs = Lists.newArrayListWithExpectedSize(this.fileConfigBuilders.size());
- this.confListeners = new CopyOnWriteArraySet<ConfigurationListener>();
- reload();
- scheduleReload();
- }
-
- public void registerListener(ConfigurationListener listener) {
- this.confListeners.add(listener);
- }
-
- public void unregisterListener(ConfigurationListener listener) {
- this.confListeners.remove(listener);
- }
-
- private boolean initConfig() {
- if (fileConfigs.isEmpty()) {
- try {
- for (FileConfigurationBuilder fileConfigBuilder : fileConfigBuilders) {
- FileConfiguration fileConfig = fileConfigBuilder.getConfiguration();
- FileChangedReloadingStrategy reloadingStrategy = new FileChangedReloadingStrategy();
- reloadingStrategy.setRefreshDelay(0);
- fileConfig.setReloadingStrategy(reloadingStrategy);
- fileConfigs.add(fileConfig);
- }
- } catch (ConfigurationException ex) {
- if (!fileNotFound(ex)) {
- LOG.error("Config init failed {}", ex);
- }
- }
- }
- return !fileConfigs.isEmpty();
- }
-
- private void scheduleReload() {
- executorService.scheduleAtFixedRate(new Runnable() {
- @Override
- public void run() {
- reload();
- }
- }, 0, reloadPeriod, reloadUnit);
- }
-
- @VisibleForTesting
- void reload() {
- // No-op if already loaded.
- if (!initConfig()) {
- return;
- }
- // Reload if config exists.
- Set<String> confKeys = Sets.newHashSet();
- for (FileConfiguration fileConfig : fileConfigs) {
- LOG.debug("Check and reload config, file={}, lastModified={}", fileConfig.getFile(),
- fileConfig.getFile().lastModified());
- fileConfig.reload();
- // load keys
- Iterator keyIter = fileConfig.getKeys();
- while (keyIter.hasNext()) {
- String key = (String) keyIter.next();
- confKeys.add(key);
- }
- }
- // clear unexisted keys
- Iterator viewIter = viewConfig.getKeys();
- while (viewIter.hasNext()) {
- String key = (String) viewIter.next();
- if (!confKeys.contains(key)) {
- clearViewProperty(key);
- }
- }
- LOG.info("Reload features : {}", confKeys);
- // load keys from files
- for (FileConfiguration fileConfig : fileConfigs) {
- try {
- loadView(fileConfig);
- } catch (Exception ex) {
- if (!fileNotFound(ex)) {
- LOG.error("Config reload failed for file {}", fileConfig.getFileName(), ex);
- }
- }
- }
- for (ConfigurationListener listener : confListeners) {
- listener.onReload(viewConfig);
- }
- }
-
- private boolean fileNotFound(Exception ex) {
- return ex instanceof FileNotFoundException ||
- ex.getCause() != null && ex.getCause() instanceof FileNotFoundException;
- }
-
- private void loadView(FileConfiguration fileConfig) {
- Iterator fileIter = fileConfig.getKeys();
- while (fileIter.hasNext()) {
- String key = (String) fileIter.next();
- setViewProperty(fileConfig, key, fileConfig.getProperty(key));
- }
- }
-
- private void clearViewProperty(String key) {
- LOG.debug("Removing property, key={}", key);
- viewConfig.clearProperty(key);
- }
-
- private void setViewProperty(FileConfiguration fileConfig,
- String key,
- Object value) {
- if (!viewConfig.containsKey(key) || !viewConfig.getProperty(key).equals(value)) {
- LOG.debug("Setting property, key={} value={}", key, fileConfig.getProperty(key));
- viewConfig.setProperty(key, fileConfig.getProperty(key));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
index c77778a..c69b7a5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicConfigurationFactory.java
@@ -24,7 +24,6 @@ import com.google.common.collect.Lists;
import org.apache.distributedlog.DistributedLogConfiguration;
import java.io.File;
-import java.io.FileNotFoundException;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.LinkedList;
@@ -35,6 +34,11 @@ import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.ConfigurationSubscription;
+import org.apache.distributedlog.common.config.FileConfigurationBuilder;
+import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
index ea7f4a7..9e760c5 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/config/DynamicDistributedLogConfiguration.java
@@ -19,6 +19,7 @@ package org.apache.distributedlog.config;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.bk.QuorumConfig;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
import static org.apache.distributedlog.DistributedLogConfiguration.*;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java
deleted file mode 100644
index dbf8fe7..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/FileConfigurationBuilder.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.FileConfiguration;
-
-/**
- * Abstract out FileConfiguration subclass construction.
- */
-public interface FileConfigurationBuilder {
- FileConfiguration getConfiguration() throws ConfigurationException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java
deleted file mode 100644
index df1408c..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/config/PropertiesConfigurationBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import java.net.URL;
-
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.configuration.FileConfiguration;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-/**
- * Hide PropertiesConfiguration dependency.
- */
-public class PropertiesConfigurationBuilder implements FileConfigurationBuilder {
- private URL url;
-
- public PropertiesConfigurationBuilder(URL url) {
- this.url = url;
- }
-
- @Override
- public FileConfiguration getConfiguration() throws ConfigurationException {
- return new PropertiesConfiguration(url);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java
index 83cac22..f51302e 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/ConfigurationFeatureProvider.java
@@ -17,7 +17,7 @@
*/
package org.apache.distributedlog.feature;
-import org.apache.distributedlog.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
import org.apache.bookkeeper.feature.CacheableFeatureProvider;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java
index 4689d51..201ed8a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/feature/DynamicConfigurationFeatureProvider.java
@@ -21,11 +21,11 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentBaseConfiguration;
-import org.apache.distributedlog.config.ConfigurationListener;
-import org.apache.distributedlog.config.ConfigurationSubscription;
-import org.apache.distributedlog.config.FileConfigurationBuilder;
-import org.apache.distributedlog.config.PropertiesConfigurationBuilder;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConfigurationListener;
+import org.apache.distributedlog.common.config.ConfigurationSubscription;
+import org.apache.distributedlog.common.config.FileConfigurationBuilder;
+import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeature;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java
deleted file mode 100644
index b1adf4a..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/CloseAsyncCloseableFunction.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.function;
-
-import org.apache.distributedlog.io.AsyncCloseable;
-import scala.Function0;
-import scala.runtime.AbstractFunction0;
-import scala.runtime.BoxedUnit;
-
-/**
- * Function to close {@link org.apache.distributedlog.io.AsyncCloseable}
- */
-public class CloseAsyncCloseableFunction extends AbstractFunction0<BoxedUnit> {
-
- /**
- * Return a function to close an {@link AsyncCloseable}.
- *
- * @param closeable closeable to close
- * @return function to close an {@link AsyncCloseable}
- */
- public static Function0<BoxedUnit> of(AsyncCloseable closeable) {
- return new CloseAsyncCloseableFunction(closeable);
- }
-
- private final AsyncCloseable closeable;
-
- private CloseAsyncCloseableFunction(AsyncCloseable closeable) {
- this.closeable = closeable;
- }
-
- @Override
- public BoxedUnit apply() {
- closeable.asyncClose();
- return BoxedUnit.UNIT;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java
deleted file mode 100644
index 6360f2c..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/DefaultValueMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.function;
-
-import scala.runtime.AbstractFunction1;
-
-/**
- * Map Function return default value
- */
-public class DefaultValueMapFunction<T, R> extends AbstractFunction1<T, R> {
-
- public static <T, R> DefaultValueMapFunction<T, R> of(R defaultValue) {
- return new DefaultValueMapFunction<T, R>(defaultValue);
- }
-
- private final R defaultValue;
-
- private DefaultValueMapFunction(R defaultValue) {
- this.defaultValue = defaultValue;
- }
-
- @Override
- public R apply(T any) {
- return defaultValue;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java
index 1bf620c..00703e3 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetLastTxIdFunction.java
@@ -17,16 +17,15 @@
*/
package org.apache.distributedlog.function;
+import java.util.List;
+import java.util.function.Function;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.LogSegmentMetadata;
-import scala.runtime.AbstractFunction1;
-
-import java.util.List;
/**
* Retrieve the last tx id from list of log segments
*/
-public class GetLastTxIdFunction extends AbstractFunction1<List<LogSegmentMetadata>, Long> {
+public class GetLastTxIdFunction implements Function<List<LogSegmentMetadata>, Long> {
public static final GetLastTxIdFunction INSTANCE = new GetLastTxIdFunction();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java b/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java
deleted file mode 100644
index 98164de..0000000
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/function/GetVersionedValueFunction.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.function;
-
-import org.apache.distributedlog.LogSegmentMetadata;
-import org.apache.bookkeeper.versioning.Versioned;
-import scala.Function1;
-import scala.runtime.AbstractFunction1;
-
-import java.util.List;
-
-/**
- * Function to get the versioned value from {@link org.apache.bookkeeper.versioning.Versioned}
- */
-public class GetVersionedValueFunction<T> extends AbstractFunction1<Versioned<T>, T> {
-
- public static final Function1<Versioned<List<LogSegmentMetadata>>, List<LogSegmentMetadata>>
- GET_LOGSEGMENT_LIST_FUNC = new GetVersionedValueFunction<List<LogSegmentMetadata>>();
-
- @Override
- public T apply(Versioned<T> versionedValue) {
- return versionedValue.getValue();
- }
-}