You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:34 UTC
[02/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java
deleted file mode 100644
index db0ee4e..0000000
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TimedOutTestsListener.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.management.LockInfo;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MonitorInfo;
-import java.lang.management.ThreadInfo;
-import java.lang.management.ThreadMXBean;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.junit.runner.notification.Failure;
-import org.junit.runner.notification.RunListener;
-
-/**
- * JUnit run listener which prints full thread dump into System.err in case a test is failed due to
- * timeout.
- */
-public class TimedOutTestsListener extends RunListener {
-
- static final String TEST_TIMED_OUT_PREFIX = "test timed out after";
-
- private static String indent = " ";
-
- private final PrintWriter output;
-
- public TimedOutTestsListener() {
- this.output = new PrintWriter(System.err);
- }
-
- public TimedOutTestsListener(PrintWriter output) {
- this.output = output;
- }
-
- @Override
- public void testFailure(Failure failure) throws Exception {
- if (failure != null && failure.getMessage() != null && failure.getMessage().startsWith(TEST_TIMED_OUT_PREFIX)) {
- output.println("====> TEST TIMED OUT. PRINTING THREAD DUMP. <====");
- output.println();
- output.print(buildThreadDiagnosticString());
- }
- }
-
- public static String buildThreadDiagnosticString() {
- StringWriter sw = new StringWriter();
- PrintWriter output = new PrintWriter(sw);
-
- DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss,SSS");
- output.println(String.format("Timestamp: %s", dateFormat.format(new Date())));
- output.println();
- output.println(buildThreadDump());
-
- String deadlocksInfo = buildDeadlockInfo();
- if (deadlocksInfo != null) {
- output.println("====> DEADLOCKS DETECTED <====");
- output.println();
- output.println(deadlocksInfo);
- }
-
- return sw.toString();
- }
-
- static String buildThreadDump() {
- StringBuilder dump = new StringBuilder();
- Map<Thread, StackTraceElement[]> stackTraces = Thread.getAllStackTraces();
- for (Map.Entry<Thread, StackTraceElement[]> e : stackTraces.entrySet()) {
- Thread thread = e.getKey();
- dump.append(String.format("\"%s\" %s prio=%d tid=%d %s\njava.lang.Thread.State: %s", thread.getName(),
- (thread.isDaemon() ? "daemon" : ""), thread.getPriority(), thread.getId(),
- Thread.State.WAITING.equals(thread.getState()) ? "in Object.wait()"
- : StringUtils.lowerCase(thread.getState().name()),
- Thread.State.WAITING.equals(thread.getState()) ? "WAITING (on object monitor)" : thread.getState()));
- for (StackTraceElement stackTraceElement : e.getValue()) {
- dump.append("\n at ");
- dump.append(stackTraceElement);
- }
- dump.append("\n");
- }
- return dump.toString();
- }
-
- static String buildDeadlockInfo() {
- ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
- long[] threadIds = threadBean.findMonitorDeadlockedThreads();
- if (threadIds != null && threadIds.length > 0) {
- StringWriter stringWriter = new StringWriter();
- PrintWriter out = new PrintWriter(stringWriter);
-
- ThreadInfo[] infos = threadBean.getThreadInfo(threadIds, true, true);
- for (ThreadInfo ti : infos) {
- printThreadInfo(ti, out);
- printLockInfo(ti.getLockedSynchronizers(), out);
- out.println();
- }
-
- out.close();
- return stringWriter.toString();
- } else {
- return null;
- }
- }
-
- private static void printThreadInfo(ThreadInfo ti, PrintWriter out) {
- // print thread information
- printThread(ti, out);
-
- // print stack trace with locks
- StackTraceElement[] stacktrace = ti.getStackTrace();
- MonitorInfo[] monitors = ti.getLockedMonitors();
- for (int i = 0; i < stacktrace.length; i++) {
- StackTraceElement ste = stacktrace[i];
- out.println(indent + "at " + ste.toString());
- for (MonitorInfo mi : monitors) {
- if (mi.getLockedStackDepth() == i) {
- out.println(indent + " - locked " + mi);
- }
- }
- }
- out.println();
- }
-
- private static void printThread(ThreadInfo ti, PrintWriter out) {
- out.print("\"" + ti.getThreadName() + "\"" + " Id=" + ti.getThreadId() + " in " + ti.getThreadState());
- if (ti.getLockName() != null) {
- out.print(" on lock=" + ti.getLockName());
- }
- if (ti.isSuspended()) {
- out.print(" (suspended)");
- }
- if (ti.isInNative()) {
- out.print(" (running in native)");
- }
- out.println();
- if (ti.getLockOwnerName() != null) {
- out.println(indent + " owned by " + ti.getLockOwnerName() + " Id=" + ti.getLockOwnerId());
- }
- }
-
- private static void printLockInfo(LockInfo[] locks, PrintWriter out) {
- out.println(indent + "Locked synchronizers: count = " + locks.length);
- for (LockInfo li : locks) {
- out.println(indent + " - " + li);
- }
- out.println();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/test/resources/log4j.properties b/distributedlog-protocol/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3e51059
--- /dev/null
+++ b/distributedlog-protocol/src/test/resources/log4j.properties
@@ -0,0 +1,51 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+#
+# DisributedLog Logging Configuration
+#
+
+# Example with rolling log file
+log4j.rootLogger=INFO, CONSOLE
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+#Set the bookkeeper level to warning
+log4j.logger.org.apache.bookkeeper=INFO
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Add ROLLINGFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+#log4j.appender.ROLLINGFILE.Threshold=INFO
+#log4j.appender.ROLLINGFILE.File=distributedlog.log
+#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
+#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=TRACE
+log4j.appender.R.File=target/error.log
+log4j.appender.R.MaxFileSize=200MB
+log4j.appender.R.MaxBackupIndex=7
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-client/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/pom.xml b/distributedlog-proxy-client/pom.xml
index 7392d90..25ad732 100644
--- a/distributedlog-proxy-client/pom.xml
+++ b/distributedlog-proxy-client/pom.xml
@@ -86,7 +86,7 @@
</dependency>
<dependency>
<groupId>org.apache.distributedlog</groupId>
- <artifactId>distributedlog-protocol</artifactId>
+ <artifactId>distributedlog-common</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
@@ -129,7 +129,7 @@
<properties>
<property>
<name>listener</name>
- <value>org.apache.distributedlog.TimedOutTestsListener</value>
+ <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value>
</property>
</properties>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
index b3f3368..781005c 100644
--- a/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
+++ b/distributedlog-proxy-client/src/main/java/org/apache/distributedlog/client/DistributedLogMultiStreamWriter.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
import static org.apache.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newJFuture;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
@@ -440,7 +441,7 @@ public class DistributedLogMultiStreamWriter implements Runnable {
}
Promise<DLSN> writePromise = new Promise<DLSN>();
try {
- recordSetWriter.writeRecord(buffer, writePromise);
+ recordSetWriter.writeRecord(buffer, newJFuture(writePromise));
} catch (LogRecordTooLongException e) {
return Future.exception(e);
} catch (WriteException e) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java
new file mode 100644
index 0000000..6ce1fa4
--- /dev/null
+++ b/distributedlog-proxy-protocol/src/main/java/org/apache/distributedlog/protocol/util/TwitterFutureUtils.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.protocol.util;
+
+import com.google.common.collect.Lists;
+import com.twitter.util.Future;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+
+/**
+ * Utils for Twitter's {@link com.twitter.util.Future}.
+ */
+public final class TwitterFutureUtils {
+
+ private TwitterFutureUtils() {}
+
+ public static <T> CompletableFuture<T> newJFuture(Promise<T> promise) {
+ CompletableFuture<T> jFuture = FutureUtils.createFuture();
+ jFuture.whenComplete((value, cause) -> {
+ if (null != cause) {
+ if (cause instanceof CompletionException) {
+ promise.setException(cause.getCause());
+ } else {
+ promise.setException(cause);
+ }
+ } else {
+ promise.setValue(value);
+ }
+ });
+ return jFuture;
+ }
+
+ public static <T> Future<T> newTFuture(CompletableFuture<T> jFuture) {
+ Promise<T> promise = new Promise<>();
+ jFuture.whenComplete((value, cause) -> {
+ if (null != cause) {
+ if (cause instanceof CompletionException) {
+ promise.setException(cause.getCause());
+ } else {
+ promise.setException(cause);
+ }
+ } else {
+ promise.setValue(value);
+ }
+ });
+ return promise;
+ }
+
+ public static <T> Future<List<Future<T>>> newTFutureList(
+ CompletableFuture<List<CompletableFuture<T>>> jFutureList) {
+ Promise<List<Future<T>>> promise = new Promise<>();
+ jFutureList.whenComplete((value, cause) -> {
+ if (null != cause) {
+ if (cause instanceof CompletionException) {
+ promise.setException(cause.getCause());
+ } else {
+ promise.setException(cause);
+ }
+ } else {
+ promise.setValue(Lists.transform(
+ value,
+ future -> newTFuture(future)));
+ }
+ });
+ return promise;
+ }
+
+ public static <T> void setValue(Promise<T> promise, T value) {
+ promise.updateIfEmpty(new Return<T>(value));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/pom.xml b/distributedlog-proxy-server/pom.xml
index d7cbd56..83b2bef 100644
--- a/distributedlog-proxy-server/pom.xml
+++ b/distributedlog-proxy-server/pom.xml
@@ -135,7 +135,7 @@
</dependency>
<dependency>
<groupId>org.apache.distributedlog</groupId>
- <artifactId>distributedlog-protocol</artifactId>
+ <artifactId>distributedlog-common</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
@@ -189,7 +189,7 @@
<properties>
<property>
<name>listener</name>
- <value>org.apache.distributedlog.TimedOutTestsListener</value>
+ <value>org.apache.distributedlog.common.util.TimedOutTestsListener</value>
</property>
</properties>
</configuration>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
index 81e476b..c904499 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServer.java
@@ -37,7 +37,7 @@ import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConver
import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
import org.apache.distributedlog.thrift.service.DistributedLogService;
import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
import com.twitter.finagle.Stack;
import com.twitter.finagle.ThriftMuxServer$;
import com.twitter.finagle.builder.Server;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
index c37cd53..72f2758 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/DistributedLogServiceImpl.java
@@ -25,6 +25,7 @@ import com.twitter.common.net.InetSocketAddressHelper;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.acl.AccessControlManager;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
import org.apache.distributedlog.client.resolver.RegionResolver;
import org.apache.distributedlog.client.routing.RoutingService;
@@ -35,10 +36,9 @@ import org.apache.distributedlog.exceptions.ServiceUnavailableException;
import org.apache.distributedlog.exceptions.StreamUnavailableException;
import org.apache.distributedlog.exceptions.TooManyStreamsException;
import org.apache.distributedlog.feature.AbstractFeatureProvider;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import org.apache.distributedlog.rate.MovingAverageRate;
-import org.apache.distributedlog.rate.MovingAverageRateFactory;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.rate.MovingAverageRate;
+import org.apache.distributedlog.common.rate.MovingAverageRateFactory;
import org.apache.distributedlog.service.config.ServerConfiguration;
import org.apache.distributedlog.service.config.StreamConfigProvider;
import org.apache.distributedlog.service.placement.LeastLoadPlacementPolicy;
@@ -76,7 +76,7 @@ import org.apache.distributedlog.thrift.service.WriteContext;
import org.apache.distributedlog.thrift.service.WriteResponse;
import org.apache.distributedlog.util.ConfUtils;
import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Function;
@@ -116,7 +116,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
private final ServerConfiguration serverConfig;
private final DistributedLogConfiguration dlConfig;
- private final DistributedLogNamespace dlNamespace;
+ private final Namespace dlNamespace;
private final int serverRegionId;
private final PlacementPolicy placementPolicy;
private ServerStatus serverStatus = ServerStatus.WRITE_AND_ACCEPT;
@@ -199,7 +199,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
}
// Build the namespace
- this.dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
+ this.dlNamespace = NamespaceBuilder.newBuilder()
.conf(dlConf)
.uri(uri)
.statsLogger(statsLogger)
@@ -218,8 +218,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
this.scheduler = OrderedScheduler.newBuilder()
.corePoolSize(numThreads)
.name("DistributedLogService-Executor")
- .traceTaskExecution(true)
- .statsLogger(statsLogger.scope("scheduler"))
.build();
// Timer, kept separate to ensure reliability of timeouts.
@@ -261,7 +259,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
// Resource limiting
this.timer = new ScheduledThreadPoolTimer(1, "timer", true);
- this.movingAvgFactory = new MovingAverageRateFactory(timer);
+ this.movingAvgFactory = new MovingAverageRateFactory(scheduler);
this.windowedRps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
this.windowedBps = movingAvgFactory.create(MOVING_AVERAGE_WINDOW_SECS);
this.limiter = new ServiceRequestLimiter(
@@ -783,7 +781,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
}
@VisibleForTesting
- public DistributedLogNamespace getDistributedLogNamespace() {
+ public Namespace getDistributedLogNamespace() {
return dlNamespace;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
index b1e2879..969c598 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/MonitorService.java
@@ -27,14 +27,14 @@ import com.google.common.hash.Hashing;
import com.twitter.common.zookeeper.ServerSet;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.callback.LogSegmentListener;
import org.apache.distributedlog.callback.NamespaceListener;
import org.apache.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.distributedlog.client.serverset.DLZkServerSet;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.Stat;
import com.twitter.finagle.stats.StatsReceiver;
@@ -71,7 +71,7 @@ public class MonitorService implements NamespaceListener {
private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
- private DistributedLogNamespace dlNamespace = null;
+ private Namespace dlNamespace = null;
private MonitorServiceClient dlClient = null;
private DLZkServerSet[] zkServerSets = null;
private final ScheduledExecutorService executorService =
@@ -411,7 +411,7 @@ public class MonitorService implements NamespaceListener {
// stats
statsProvider.getStatsLogger("monitor").registerGauge("num_streams", numOfStreamsGauge);
logger.info("Construct dl namespace @ {}", dlUri);
- dlNamespace = DistributedLogNamespaceBuilder.newBuilder()
+ dlNamespace = NamespaceBuilder.newBuilder()
.conf(conf)
.uri(dlUri)
.build();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
index 08f4b41..b327867 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/ResponseUtils.java
@@ -17,6 +17,7 @@
*/
package org.apache.distributedlog.service;
+import java.util.concurrent.CompletionException;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.thrift.service.BulkWriteResponse;
@@ -53,6 +54,8 @@ public class ResponseUtils {
}
response.setCode(StatusCode.findByValue(dle.getCode()));
response.setErrMsg(dle.getMessage());
+ } else if (t instanceof CompletionException) {
+ return exceptionToHeader(t.getCause());
} else {
response.setCode(StatusCode.INTERNAL_SERVER_ERROR);
response.setErrMsg("Internal server error : " + t.getMessage());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
index 7d72093..53e16b4 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/DefaultStreamConfigProvider.java
@@ -20,11 +20,11 @@ package org.apache.distributedlog.service.config;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
-import org.apache.distributedlog.config.ConfigurationSubscription;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.ConfigurationSubscription;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.config.FileConfigurationBuilder;
-import org.apache.distributedlog.config.PropertiesConfigurationBuilder;
+import org.apache.distributedlog.common.config.FileConfigurationBuilder;
+import org.apache.distributedlog.common.config.PropertiesConfigurationBuilder;
import java.io.File;
import java.net.MalformedURLException;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
index 257b4be..1e62302 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/config/ServerConfiguration.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
import org.apache.bookkeeper.util.ReflectionUtils;
@@ -160,7 +161,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set the region id used to instantiate DistributedLogNamespace.
+ * Set the region id used to instantiate Namespace.
*
* @param regionId
* region id
@@ -172,9 +173,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get the region id used to instantiate {@link org.apache.distributedlog.namespace.DistributedLogNamespace}.
+ * Get the region id used to instantiate {@link Namespace}.
*
- * @return region id used to instantiate DistributedLogNamespace
+ * @return region id used to instantiate Namespace
*/
public int getRegionId() {
return getInt(SERVER_REGION_ID, SERVER_REGION_ID_DEFAULT);
@@ -216,7 +217,7 @@ public class ServerConfiguration extends CompositeConfiguration {
/**
* Get the shard id of this server.
*
- * <p>It would be used to instantiate the client id used for DistributedLogNamespace.
+ * <p>It would be used to instantiate the client id used for Namespace.
*
* @return shard id of this server.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
index 2e9dd6b..1336ddd 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -18,7 +18,7 @@
package org.apache.distributedlog.service.placement;
import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.api.namespace.Namespace;
import com.twitter.util.Duration;
import com.twitter.util.Function;
import com.twitter.util.Future;
@@ -52,7 +52,7 @@ public class LeastLoadPlacementPolicy extends PlacementPolicy {
private Map<String, String> streamToServer = new HashMap<String, String>();
public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
- DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+ Namespace namespace, PlacementStateManager placementStateManager,
Duration refreshInterval, StatsLogger statsLogger) {
super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
index ac952aa..17edc22 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/PlacementPolicy.java
@@ -17,8 +17,8 @@
*/
package org.apache.distributedlog.service.placement;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.service.DLSocketAddress;
import com.twitter.util.Duration;
import com.twitter.util.Function0;
@@ -53,14 +53,14 @@ public abstract class PlacementPolicy {
protected final LoadAppraiser loadAppraiser;
protected final RoutingService routingService;
- protected final DistributedLogNamespace namespace;
+ protected final Namespace namespace;
protected final PlacementStateManager placementStateManager;
private final Duration refreshInterval;
protected final OpStatsLogger placementCalcStats;
private Timer placementRefreshTimer;
public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
- DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+ Namespace namespace, PlacementStateManager placementStateManager,
Duration refreshInterval, StatsLogger statsLogger) {
this.loadAppraiser = loadAppraiser;
this.routingService = routingService;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
index 862f05a..5dcea73 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -63,8 +63,7 @@ public class ZKPlacementStateManager implements PlacementStateManager {
serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
}
- private void createServerLoadPathIfNoExists(byte[] data)
- throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+ private void createServerLoadPathIfNoExists(byte[] data) throws KeeperException, IOException {
try {
Utils.zkCreateFullPathOptimistic(
zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
@@ -152,7 +151,7 @@ public class ZKPlacementStateManager implements PlacementStateManager {
watching = false;
watch(callback);
}
- } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+ } catch (InterruptedException | IOException | KeeperException e) {
logger.error("Watch of Ownership failed", e);
watching = false;
watch(callback);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
index 83ac668..7700184 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java
@@ -18,13 +18,13 @@
package org.apache.distributedlog.service.stream;
import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.exceptions.ChecksumFailedException;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.service.ResponseUtils;
import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
index 6c98468..372703a 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java
@@ -17,7 +17,9 @@
*/
package org.apache.distributedlog.service.stream;
-import org.apache.distributedlog.AsyncLogWriter;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFutureList;
+
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.acl.AccessControlManager;
@@ -33,7 +35,7 @@ import org.apache.distributedlog.thrift.service.BulkWriteResponse;
import org.apache.distributedlog.thrift.service.ResponseHeader;
import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.ConstFuture;
import com.twitter.util.Future;
import com.twitter.util.Future$;
@@ -157,7 +159,7 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements
Future<List<Future<DLSN>>> futureList;
synchronized (txnLock) {
records = asRecordList(buffers, sequencer);
- futureList = writer.writeBulk(records);
+ futureList = newTFutureList(writer.writeBulk(records));
}
// Collect into a list of tries to make it easier to extract exception or DLSN.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
index 3ecb46f..24ce0be 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/DeleteOp.java
@@ -17,13 +17,13 @@
*/
package org.apache.distributedlog.service.stream;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.RequestDeniedException;
import org.apache.distributedlog.service.ResponseUtils;
import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.Future;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.Counter;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
index 0ffa619..c9dec80 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/HeartbeatOp.java
@@ -18,8 +18,9 @@
package org.apache.distributedlog.service.stream;
import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.acl.AccessControlManager;
@@ -27,7 +28,7 @@ import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.RequestDeniedException;
import org.apache.distributedlog.service.ResponseUtils;
import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.Future;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.Counter;
@@ -78,7 +79,7 @@ public class HeartbeatOp extends AbstractWriteOp {
txnId = sequencer.nextId();
LogRecord hbRecord = new LogRecord(txnId, HEARTBEAT_DATA);
hbRecord.setControl();
- writeResult = writer.write(hbRecord);
+ writeResult = newTFuture(writer.write(hbRecord));
}
return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
index 6ec8642..d657660 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/ReleaseOp.java
@@ -17,13 +17,13 @@
*/
package org.apache.distributedlog.service.stream;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.RequestDeniedException;
import org.apache.distributedlog.service.ResponseUtils;
import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.Future;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.Counter;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
index 2b90d55..98362b5 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamFactoryImpl.java
@@ -18,8 +18,8 @@
package org.apache.distributedlog.service.stream;
import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.service.FatalErrorHandler;
import org.apache.distributedlog.service.config.ServerConfiguration;
import org.apache.distributedlog.service.config.StreamConfigProvider;
@@ -40,7 +40,7 @@ public class StreamFactoryImpl implements StreamFactory {
private final FeatureProvider featureProvider;
private final StreamConfigProvider streamConfigProvider;
private final StreamPartitionConverter streamPartitionConverter;
- private final DistributedLogNamespace dlNamespace;
+ private final Namespace dlNamespace;
private final OrderedScheduler scheduler;
private final FatalErrorHandler fatalErrorHandler;
private final HashedWheelTimer requestTimer;
@@ -53,7 +53,7 @@ public class StreamFactoryImpl implements StreamFactory {
FeatureProvider featureProvider,
StreamConfigProvider streamConfigProvider,
StreamPartitionConverter streamPartitionConverter,
- DistributedLogNamespace dlNamespace,
+ Namespace dlNamespace,
OrderedScheduler scheduler,
FatalErrorHandler fatalErrorHandler,
HashedWheelTimer requestTimer) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
index c0c0972..df3d64f 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java
@@ -20,9 +20,10 @@ package org.apache.distributedlog.service.stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.DLException;
@@ -33,15 +34,16 @@ import org.apache.distributedlog.exceptions.StreamNotReadyException;
import org.apache.distributedlog.exceptions.StreamUnavailableException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.io.Abortables;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.protocol.util.TwitterFutureUtils;
import org.apache.distributedlog.service.FatalErrorHandler;
import org.apache.distributedlog.service.ServerFeatureKeys;
import org.apache.distributedlog.service.config.ServerConfiguration;
import org.apache.distributedlog.service.config.StreamConfigProvider;
import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter;
import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.TimeSequencer;
import org.apache.distributedlog.util.Utils;
@@ -50,7 +52,6 @@ import com.twitter.util.Function0;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
-import com.twitter.util.TimeoutException;
import com.twitter.util.Timer;
import java.io.IOException;
import java.util.ArrayDeque;
@@ -126,7 +127,7 @@ public class StreamImpl implements Stream {
private final StreamRequestLimiter limiter;
private final DynamicDistributedLogConfiguration dynConf;
private final DistributedLogConfiguration dlConfig;
- private final DistributedLogNamespace dlNamespace;
+ private final Namespace dlNamespace;
private final String clientId;
private final OrderedScheduler scheduler;
private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
@@ -169,7 +170,7 @@ public class StreamImpl implements Stream {
DynamicDistributedLogConfiguration streamConf,
FeatureProvider featureProvider,
StreamConfigProvider streamConfigProvider,
- DistributedLogNamespace dlNamespace,
+ Namespace dlNamespace,
OrderedScheduler scheduler,
FatalErrorHandler fatalErrorHandler,
HashedWheelTimer requestTimer,
@@ -555,8 +556,8 @@ public class StreamImpl implements Stream {
Future<Boolean> acquireStream() {
final Stopwatch stopwatch = Stopwatch.createStarted();
final Promise<Boolean> acquirePromise = new Promise<Boolean>();
- manager.openAsyncLogWriter().addEventListener(
- FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
+ manager.openAsyncLogWriter().whenCompleteAsync(
+ new org.apache.distributedlog.common.concurrent.FutureEventListener<AsyncLogWriter>() {
@Override
public void onSuccess(AsyncLogWriter w) {
@@ -568,7 +569,7 @@ public class StreamImpl implements Stream {
onAcquireStreamFailure(cause, stopwatch, acquirePromise);
}
- }, scheduler, getStreamName()));
+ }, scheduler.chooseExecutor(getStreamName()));
return acquirePromise;
}
@@ -662,7 +663,7 @@ public class StreamImpl implements Stream {
pendingOpsCounter.dec();
}
Abortables.asyncAbort(oldWriter, true);
- FutureUtils.setValue(acquirePromise, success);
+ TwitterFutureUtils.setValue(acquirePromise, success);
}
//
@@ -802,7 +803,7 @@ public class StreamImpl implements Stream {
logger.info("Removed cached stream {}.", getStreamName());
}
}
- FutureUtils.setValue(closePromise, null);
+ TwitterFutureUtils.setValue(closePromise, null);
}
/**
@@ -825,7 +826,7 @@ public class StreamImpl implements Stream {
}
logger.info("Closing stream {} ...", name);
// Close the writers to release the locks before failing the requests
- Future<Void> closeWriterFuture;
+ CompletableFuture<Void> closeWriterFuture;
if (abort) {
closeWriterFuture = Abortables.asyncAbort(writer, true);
} else {
@@ -839,25 +840,38 @@ public class StreamImpl implements Stream {
closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs);
}
- FutureUtils.stats(
+ CompletableFuture<Void> maskedFuture = FutureUtils.createFuture();
+ FutureUtils.proxyTo(
+ FutureUtils.stats(
closeWriterFuture,
writerCloseStatLogger,
Stopwatch.createStarted()
- ).masked().within(futureTimer, closeWaitDuration)
- .addEventListener(FutureUtils.OrderedFutureEventListener.of(
- new FutureEventListener<Void>() {
- @Override
- public void onSuccess(Void value) {
- postClose(uncache);
- }
- @Override
- public void onFailure(Throwable cause) {
- if (cause instanceof TimeoutException) {
- writerCloseTimeoutCounter.inc();
- }
- postClose(uncache);
+ ),
+ maskedFuture);
+
+ FutureUtils.within(
+ maskedFuture,
+ closeWaitDuration.inMillis(),
+ TimeUnit.MILLISECONDS,
+ new java.util.concurrent.TimeoutException("Timeout on closing"),
+ scheduler,
+ name
+ ).whenCompleteAsync(
+ new org.apache.distributedlog.common.concurrent.FutureEventListener<Void>() {
+ @Override
+ public void onSuccess(Void value) {
+ postClose(uncache);
+ }
+
+ @Override
+ public void onFailure(Throwable cause) {
+ if (cause instanceof java.util.concurrent.TimeoutException) {
+ writerCloseTimeoutCounter.inc();
}
- }, scheduler, name));
+ }
+ },
+ scheduler.chooseExecutor(name)
+ );
return closePromise;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
index 5d54738..fd57c17 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamManagerImpl.java
@@ -21,11 +21,11 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.ServiceUnavailableException;
import org.apache.distributedlog.exceptions.StreamUnavailableException;
import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.service.config.StreamConfigProvider;
import org.apache.distributedlog.service.streamset.Partition;
import org.apache.distributedlog.service.streamset.PartitionMap;
@@ -86,7 +86,7 @@ public class StreamManagerImpl implements StreamManager {
private final String clientId;
private boolean closed = false;
private final StreamFactory streamFactory;
- private final DistributedLogNamespace dlNamespace;
+ private final Namespace dlNamespace;
public StreamManagerImpl(String clientId,
DistributedLogConfiguration dlConfig,
@@ -94,7 +94,7 @@ public class StreamManagerImpl implements StreamManager {
StreamFactory streamFactory,
StreamPartitionConverter partitionConverter,
StreamConfigProvider streamConfigProvider,
- DistributedLogNamespace dlNamespace) {
+ Namespace dlNamespace) {
this.clientId = clientId;
this.executorService = executorService;
this.streamFactory = streamFactory;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
index d0b8de4..b608e11 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOp.java
@@ -18,10 +18,10 @@
package org.apache.distributedlog.service.stream;
import com.google.common.base.Stopwatch;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.thrift.service.ResponseHeader;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.Future;
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
index f3fc610..feb2c6a 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamOpStats.java
@@ -18,7 +18,7 @@
package org.apache.distributedlog.service.stream;
import org.apache.distributedlog.service.streamset.Partition;
-import org.apache.distributedlog.stats.BroadCastStatsLogger;
+import org.apache.distributedlog.common.stats.BroadCastStatsLogger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
index 0036a5c..5d6dd1c 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/TruncateOp.java
@@ -17,7 +17,9 @@
*/
package org.apache.distributedlog.service.stream;
-import org.apache.distributedlog.AsyncLogWriter;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture;
+
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.exceptions.DLException;
@@ -25,14 +27,13 @@ import org.apache.distributedlog.exceptions.RequestDeniedException;
import org.apache.distributedlog.protocol.util.ProtocolUtils;
import org.apache.distributedlog.service.ResponseUtils;
import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.Future;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
/**
* Operation to truncate a log stream.
@@ -72,12 +73,7 @@ public class TruncateOp extends AbstractWriteOp {
logger.error("Truncate: Stream Name Mismatch in the Stream Map {}, {}", stream, writer.getStreamName());
return Future.exception(new IllegalStateException("The stream mapping is incorrect, fail the request"));
}
- return writer.truncate(dlsn).map(new AbstractFunction1<Boolean, WriteResponse>() {
- @Override
- public WriteResponse apply(Boolean v1) {
- return ResponseUtils.writeSuccess();
- }
- });
+ return newTFuture(writer.truncate(dlsn).thenApply((value) -> ResponseUtils.writeSuccess()));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
index 2e7ffb8..0a8a2da 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java
@@ -17,7 +17,9 @@
*/
package org.apache.distributedlog.service.stream;
-import org.apache.distributedlog.AsyncLogWriter;
+import static org.apache.distributedlog.protocol.util.TwitterFutureUtils.newTFuture;
+
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.acl.AccessControlManager;
@@ -31,7 +33,7 @@ import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
import org.apache.distributedlog.thrift.service.ResponseHeader;
import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import java.nio.ByteBuffer;
@@ -150,7 +152,7 @@ public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
if (isRecordSet) {
record.setRecordSet();
}
- writeResult = writer.write(record);
+ writeResult = newTFuture(writer.write(record));
}
return writeResult.map(new AbstractFunction1<DLSN, WriteResponse>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
index de805aa..549262d 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
@@ -22,7 +22,7 @@ import org.apache.distributedlog.exceptions.OverCapacityException;
import org.apache.distributedlog.limiter.ChainedRequestLimiter;
import org.apache.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
import org.apache.distributedlog.limiter.RequestLimiter;
-import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.common.rate.MovingAverageRate;
import org.apache.distributedlog.service.stream.StreamManager;
import org.apache.distributedlog.service.stream.StreamOp;
import org.apache.bookkeeper.feature.Feature;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
index 7675d6f..2551a5e 100644
--- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
+++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
@@ -20,7 +20,7 @@ package org.apache.distributedlog.service.stream.limiter;
import org.apache.distributedlog.exceptions.OverCapacityException;
import org.apache.distributedlog.exceptions.TooManyStreamsException;
import org.apache.distributedlog.limiter.RequestLimiter;
-import org.apache.distributedlog.rate.MovingAverageRate;
+import org.apache.distributedlog.common.rate.MovingAverageRate;
import org.apache.distributedlog.service.stream.StreamManager;
import org.apache.distributedlog.service.stream.StreamOp;
import org.apache.bookkeeper.stats.Counter;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
index 4a5dd01..16e36c9 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogServerBase.java
@@ -26,23 +26,23 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.base.Optional;
-import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogManager;
-import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.acl.AccessControlManager;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
import org.apache.distributedlog.client.routing.LocalRoutingService;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.impl.acl.ZKAccessControl;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.service.stream.StreamManagerImpl;
import org.apache.distributedlog.thrift.AccessControlEntry;
import org.apache.distributedlog.thrift.service.BulkWriteResponse;
@@ -50,7 +50,7 @@ import org.apache.distributedlog.thrift.service.HeartbeatOptions;
import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.thrift.service.WriteContext;
import org.apache.distributedlog.util.FailpointUtils;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Await;
@@ -105,7 +105,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
HeartbeatOptions hbOptions = new HeartbeatOptions();
hbOptions.setSendHeartBeatToReader(true);
// make sure the first log segment of each stream created
- FutureUtils.result(dlClient.dlClient.heartbeat(name));
+ Await.result(dlClient.dlClient.heartbeat(name));
DistributedLogManager dlm = DLMTestUtil.createNewDLM(name, conf, getUri());
LogReader reader = dlm.getInputStream(1);
@@ -305,7 +305,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
}
validateFailedAsLogRecordTooLong(futures.get(writeCount));
- FutureUtils.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
+ Await.result(Futures.collect(futures.subList(writeCount + 1, 2 * writeCount + 1)));
assertEquals(writeCount, succeeded);
}
@@ -325,7 +325,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
List<Future<DLSN>> futures = dlClient.dlClient.writeBulk(name, writes);
validateFailedAsLogRecordTooLong(futures.get(0));
- FutureUtils.result(Futures.collect(futures.subList(1, writeCount + 1)));
+ Await.result(Futures.collect(futures.subList(1, writeCount + 1)));
}
@Test(timeout = 60000)
@@ -601,7 +601,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
.connectionTimeoutMs(60000)
.sessionTimeoutMs(60000)
.build();
- DistributedLogNamespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
+ Namespace dlNamespace = dlServer.dlServer.getLeft().getDistributedLogNamespace();
BKDLConfig bkdlConfig = BKDLConfig.resolveDLConfig(zkc, getUri());
String zkPath = getUri().getPath() + "/" + bkdlConfig.getACLRootPath() + "/" + name;
ZKAccessControl accessControl = new ZKAccessControl(ace, zkPath);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
index 4a2d65f..60f814e 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/TestDistributedLogService.java
@@ -50,7 +50,6 @@ import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.thrift.service.WriteContext;
import org.apache.distributedlog.thrift.service.WriteResponse;
import org.apache.distributedlog.util.ConfUtils;
-import org.apache.distributedlog.util.FutureUtils;
import com.twitter.util.Await;
import com.twitter.util.Future;
import java.net.URI;
@@ -446,7 +445,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
assertTrue("Write should not fail before closing",
futureList.get(i).isDefined());
WriteResponse response = Await.result(futureList.get(i));
- assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION,
+ assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION
+ + " but " + response.getHeader().getCode() + " is received.",
StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
|| StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
|| StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
@@ -500,7 +500,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
}
assertTrue("Stream " + streamName + " should be cached",
streamManager.getCachedStreams().containsKey(streamName));
- List<WriteResponse> resultList = FutureUtils.result(Future.collect(futureList));
+ List<WriteResponse> resultList = Await.result(Future.collect(futureList));
for (WriteResponse wr : resultList) {
assertEquals(DLSN.InvalidDLSN, DLSN.deserialize(wr.getDlsn()));
}
@@ -689,7 +689,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
HeartbeatOptions hbOptions = new HeartbeatOptions();
hbOptions.setSendHeartBeatToReader(true);
// make sure the first log segment of each stream created
- FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
+ Await.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
for (int j = 0; j < numWrites; j++) {
futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
}
@@ -741,7 +741,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
HeartbeatOptions hbOptions = new HeartbeatOptions();
hbOptions.setSendHeartBeatToReader(true);
// make sure the first log segment of each stream created
- FutureUtils.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
+ Await.result(localService.heartbeatWithOptions(streamName, new WriteContext(), hbOptions));
for (int j = 0; j < numWrites; j++) {
futureList.add(localService.write(streamName, createRecord(i * numWrites + j)));
}
@@ -803,7 +803,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
service.startPlacementPolicy();
- WriteResponse response = FutureUtils.result(service.getOwner("stream-1", new WriteContext()));
+ WriteResponse response = Await.result(service.getOwner("stream-1", new WriteContext()));
assertEquals(StatusCode.FOUND, response.getHeader().getCode());
assertEquals(service.getServiceAddress().toString(),
response.getHeader().getLocation());
@@ -824,7 +824,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
assertNull(stream.getLastException());
// the stream is acquired
- response = FutureUtils.result(service.getOwner("stream-2", new WriteContext()));
+ response = Await.result(service.getOwner("stream-2", new WriteContext()));
assertEquals(StatusCode.FOUND, response.getHeader().getCode());
assertEquals(service.getServiceAddress().toString(),
response.getHeader().getLocation());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
index 5f5ecd4..5b8e3a6 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Future;
@@ -55,7 +55,7 @@ public class TestLeastLoadPlacementPolicy {
int numSevers = new Random().nextInt(20) + 1;
int numStreams = new Random().nextInt(200) + 1;
RoutingService mockRoutingService = mock(RoutingService.class);
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+ Namespace mockNamespace = mock(Namespace.class);
LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
new EqualLoadAppraiser(),
mockRoutingService,
@@ -81,7 +81,7 @@ public class TestLeastLoadPlacementPolicy {
int numStreams = new Random().nextInt(200) + 1;
RoutingService mockRoutingService = mock(RoutingService.class);
when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+ Namespace mockNamespace = mock(Namespace.class);
try {
when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
} catch (IOException e) {
@@ -112,7 +112,7 @@ public class TestLeastLoadPlacementPolicy {
/* use AtomicInteger to have a final object in answer method */
final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
RoutingService mockRoutingService = mock(RoutingService.class);
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+ Namespace mockNamespace = mock(Namespace.class);
LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
index 56e9483..2e87b71 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamManager.java
@@ -26,8 +26,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.service.config.StreamConfigProvider;
import org.apache.distributedlog.service.streamset.Partition;
import org.apache.distributedlog.service.streamset.StreamPartitionConverter;
@@ -67,7 +67,7 @@ public class TestStreamManager {
mockStreamFactory,
mockPartitionConverter,
mockStreamConfigProvider,
- mock(DistributedLogNamespace.class));
+ mock(Namespace.class));
assertFalse(streamManager.isAcquired("stream1"));
assertEquals(0, streamManager.numAcquired());
@@ -117,7 +117,7 @@ public class TestStreamManager {
(DynamicDistributedLogConfiguration) any(),
(StreamManager) any())
).thenReturn(mockStream);
- DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class);
+ Namespace dlNamespace = mock(Namespace.class);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
StreamManager streamManager = new StreamManagerImpl(
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
index a18fda1..dc861a4 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/TestStreamOp.java
@@ -22,7 +22,7 @@ import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import org.apache.distributedlog.AsyncLogWriter;
+import org.apache.distributedlog.api.AsyncLogWriter;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.acl.DefaultAccessControlManager;
@@ -32,9 +32,9 @@ import org.apache.distributedlog.service.config.ServerConfiguration;
import org.apache.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.thrift.service.WriteResponse;
-import org.apache.distributedlog.util.Sequencer;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.common.util.Sequencer;
import com.twitter.util.Await;
-import com.twitter.util.Future;
import java.nio.ByteBuffer;
import org.apache.bookkeeper.feature.SettableFeature;
import org.apache.bookkeeper.stats.NullStatsLogger;
@@ -79,7 +79,7 @@ public class TestStreamOp {
@Test(timeout = 60000)
public void testResponseSucceededThenFailed() throws Exception {
AsyncLogWriter writer = mock(AsyncLogWriter.class);
- when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3)));
+ when(writer.write((LogRecord) any())).thenReturn(FutureUtils.value(new DLSN(1, 2, 3)));
when(writer.getStreamName()).thenReturn("test");
WriteOp writeOp = getWriteOp();
writeOp.execute(writer, new Sequencer() {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
index 431bfa4..ccf3188 100644
--- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
+++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.OverCapacityException;
import org.apache.distributedlog.limiter.ChainedRequestLimiter;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties b/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties
index 56a6417..7aa93f6 100644
--- a/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties
+++ b/distributedlog-tutorials/distributedlog-basic/conf/log4j.properties
@@ -30,11 +30,7 @@ log4j.logger.org.apache.zookeeper=INFO
log4j.logger.org.apache.bookkeeper=INFO
# redirect executor output to executors.log since slow op warnings can be quite verbose
-log4j.logger.org.apache.distributedlog.util.MonitoredFuturePool=INFO, Executors
-log4j.logger.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=INFO, Executors
log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
-log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
-log4j.additivity.org.apache.distributedlog.util.MonitoredScheduledThreadPoolExecutor=false
log4j.additivity.org.apache.bookkeeper.util.SafeRunnable=false
log4j.appender.Executors=org.apache.log4j.RollingFileAppender