You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:37 UTC
[05/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java
index 9258922..da4ef81 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestSequenceID.java
@@ -18,8 +18,9 @@
package org.apache.distributedlog;
import org.apache.distributedlog.LogSegmentMetadata.LogSegmentMetadataVersion;
-import com.twitter.util.Await;
-import com.twitter.util.FutureEventListener;
+import org.apache.distributedlog.api.AsyncLogReader;
+import org.apache.distributedlog.common.concurrent.FutureEventListener;
+import org.apache.distributedlog.util.Utils;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,7 +73,7 @@ public class TestSequenceID extends TestDistributedLogBase {
BKDistributedLogManager dlm = (BKDistributedLogManager) createNewDLM(confLocal, name);
BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(0L)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(0L)));
dlm.close();
@@ -126,16 +127,16 @@ public class TestSequenceID extends TestDistributedLogBase {
for (int i = 0; i < 3; i++) {
BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
for (int j = 0; j < 2; j++) {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
if (null == reader) {
reader = readDLM.getAsyncLogReader(DLSN.InitialDLSN);
final AsyncLogReader r = reader;
- reader.readNext().addEventListener(new FutureEventListener<LogRecordWithDLSN>() {
+ reader.readNext().whenComplete(new FutureEventListener<LogRecordWithDLSN>() {
@Override
public void onSuccess(LogRecordWithDLSN record) {
readRecords.add(record);
- r.readNext().addEventListener(this);
+ r.readNext().whenComplete(this);
}
@Override
@@ -149,7 +150,7 @@ public class TestSequenceID extends TestDistributedLogBase {
}
BKAsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txId++)));
List<LogSegmentMetadata> segments = dlm.getLogSegments();
assertEquals(4, segments.size());
@@ -174,12 +175,12 @@ public class TestSequenceID extends TestDistributedLogBase {
for (int i = 0; i < 3; i++) {
BKAsyncLogWriter writerv5 = dlmv5.startAsyncLogSegmentNonPartitioned();
for (int j = 0; j < 2; j++) {
- Await.result(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
+ Utils.ioResult(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
}
writerv5.closeAndComplete();
}
BKAsyncLogWriter writerv5 = dlmv5.startAsyncLogSegmentNonPartitioned();
- Await.result(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
+ Utils.ioResult(writerv5.write(DLMTestUtil.getLogRecordInstance(txId++)));
List<LogSegmentMetadata> segmentsv5 = dlmv5.getLogSegments();
assertEquals(8, segmentsv5.size());
@@ -205,7 +206,7 @@ public class TestSequenceID extends TestDistributedLogBase {
for (int i = 0; i < 3; i++) {
BKAsyncLogWriter writerv4 = dlmv4.startAsyncLogSegmentNonPartitioned();
for (int j = 0; j < 2; j++) {
- Await.result(writerv4.write(DLMTestUtil.getLogRecordInstance(txId++)));
+ Utils.ioResult(writerv4.write(DLMTestUtil.getLogRecordInstance(txId++)));
}
writerv4.closeAndComplete();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java
index 5b26a70..06708c8 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestTruncate.java
@@ -22,7 +22,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.distributedlog.util.FutureUtils;
+import org.apache.distributedlog.api.AsyncLogWriter;
+import org.apache.distributedlog.api.DistributedLogManager;
+import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.util.Utils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -31,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.distributedlog.LogSegmentMetadata.TruncationStatus;
-import com.twitter.util.Await;
import static org.junit.Assert.*;
@@ -96,11 +97,11 @@ public class TestTruncate extends TestDistributedLogBase {
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
long txid = 1 + 10 * 10;
for (int j = 1; j <= 10; j++) {
- Await.result(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(txid++)));
}
// to make sure the truncation task is executed
- DLSN lastDLSN = Await.result(dlm.getLastDLSNAsync());
+ DLSN lastDLSN = Utils.ioResult(dlm.getLastDLSNAsync());
LOG.info("Get last dlsn of stream {} : {}", name, lastDLSN);
assertEquals(6, distributedLogManager.getLogSegments().size());
@@ -123,20 +124,20 @@ public class TestTruncate extends TestDistributedLogBase {
Thread.sleep(1000);
// delete invalid dlsn
- assertFalse(Await.result(pair.getRight().truncate(DLSN.InvalidDLSN)));
+ assertFalse(Utils.ioResult(pair.getRight().truncate(DLSN.InvalidDLSN)));
verifyEntries(name, 1, 1, 5 * 10);
for (int i = 1; i <= 4; i++) {
int txn = (i-1) * 10 + i;
DLSN dlsn = txid2DLSN.get((long)txn);
- assertTrue(Await.result(pair.getRight().truncate(dlsn)));
+ assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn)));
verifyEntries(name, 1, (i - 1) * 10 + 1, (5 - i + 1) * 10);
}
// Delete higher dlsn
int txn = 43;
DLSN dlsn = txid2DLSN.get((long) txn);
- assertTrue(Await.result(pair.getRight().truncate(dlsn)));
+ assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn)));
verifyEntries(name, 1, 41, 10);
Utils.close(pair.getRight());
@@ -160,14 +161,14 @@ public class TestTruncate extends TestDistributedLogBase {
for (int i = 1; i <= 4; i++) {
int txn = (i-1) * 10 + i;
DLSN dlsn = txid2DLSN.get((long)txn);
- assertTrue(Await.result(pair.getRight().truncate(dlsn)));
+ assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn)));
verifyEntries(name, 1, (i - 1) * 10 + 1, (5 - i + 1) * 10);
}
// Delete higher dlsn
int txn = 43;
DLSN dlsn = txid2DLSN.get((long) txn);
- assertTrue(Await.result(pair.getRight().truncate(dlsn)));
+ assertTrue(Utils.ioResult(pair.getRight().truncate(dlsn)));
verifyEntries(name, 1, 41, 10);
Utils.close(pair.getRight());
@@ -176,7 +177,7 @@ public class TestTruncate extends TestDistributedLogBase {
// Try force truncation
BKDistributedLogManager dlm = (BKDistributedLogManager)createNewDLM(confLocal, name);
BKLogWriteHandler handler = dlm.createWriteHandler(true);
- FutureUtils.result(handler.purgeLogSegmentsOlderThanTxnId(Integer.MAX_VALUE));
+ Utils.ioResult(handler.purgeLogSegmentsOlderThanTxnId(Integer.MAX_VALUE));
verifyEntries(name, 1, 41, 10);
}
@@ -230,11 +231,11 @@ public class TestTruncate extends TestDistributedLogBase {
AsyncLogWriter newWriter = newDLM.startAsyncLogSegmentNonPartitioned();
long txid = 1 + 4 * 10;
for (int j = 1; j <= 10; j++) {
- Await.result(newWriter.write(DLMTestUtil.getLogRecordInstance(txid++)));
+ Utils.ioResult(newWriter.write(DLMTestUtil.getLogRecordInstance(txid++)));
}
// to make sure the truncation task is executed
- DLSN lastDLSN = Await.result(newDLM.getLastDLSNAsync());
+ DLSN lastDLSN = Utils.ioResult(newDLM.getLastDLSNAsync());
LOG.info("Get last dlsn of stream {} : {}", name, lastDLSN);
assertEquals(5, newDLM.getLogSegments().size());
@@ -277,7 +278,7 @@ public class TestTruncate extends TestDistributedLogBase {
DistributedLogManager newDLM = createNewDLM(confLocal, name);
AsyncLogWriter newWriter = newDLM.startAsyncLogSegmentNonPartitioned();
- Await.result(newWriter.truncate(dlsnMap.get(15L)));
+ Utils.ioResult(newWriter.truncate(dlsnMap.get(15L)));
List<LogSegmentMetadata> newSegments2 = newDLM.getLogSegments();
assertArrayEquals(newSegments.toArray(new LogSegmentMetadata[4]),
@@ -299,7 +300,7 @@ public class TestTruncate extends TestDistributedLogBase {
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
for (int j = 1; j <= numEntriesPerLogSegment; j++) {
long curTxId = txid++;
- DLSN dlsn = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
+ DLSN dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
txid2DLSN.put(curTxId, dlsn);
}
Utils.close(writer);
@@ -311,7 +312,7 @@ public class TestTruncate extends TestDistributedLogBase {
AsyncLogWriter writer = dlm.startAsyncLogSegmentNonPartitioned();
for (int j = 1; j <= 10; j++) {
long curTxId = txid++;
- DLSN dlsn = Await.result(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
+ DLSN dlsn = Utils.ioResult(writer.write(DLMTestUtil.getLogRecordInstance(curTxId)));
txid2DLSN.put(curTxId, dlsn);
}
return new ImmutablePair<DistributedLogManager, AsyncLogWriter>(dlm, writer);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java
index c28437f..0d0ca99 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestWriteLimiter.java
@@ -18,7 +18,6 @@
package org.apache.distributedlog;
import org.apache.distributedlog.exceptions.OverCapacityException;
-import org.apache.distributedlog.util.PermitLimiter;
import org.apache.distributedlog.util.SimplePermitLimiter;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.SettableFeature;
@@ -30,7 +29,6 @@ import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
-import scala.runtime.BoxedUnit;
public class TestWriteLimiter {
static final Logger LOG = LoggerFactory.getLogger(TestWriteLimiter.class);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java
index a1c075f..75bcda2 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestZooKeeperClient.java
@@ -19,7 +19,7 @@ package org.apache.distributedlog;
import org.apache.distributedlog.ZooKeeperClient.Credentials;
import org.apache.distributedlog.ZooKeeperClient.DigestCredentials;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.zookeeper.CreateMode;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java
index 8d88a37..45fc1f3 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControl.java
@@ -17,20 +17,20 @@
*/
package org.apache.distributedlog.acl;
+import java.net.URI;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.apache.distributedlog.impl.acl.ZKAccessControl;
import org.apache.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.net.URI;
-
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.*;
@@ -60,14 +60,14 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
ace.setDenyWrite(true);
String zkPath = "/create-zk-access-control";
ZKAccessControl zkac = new ZKAccessControl(ace, zkPath);
- Await.result(zkac.create(zkc));
+ Utils.ioResult(zkac.create(zkc));
- ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+ ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(zkac, readZKAC);
ZKAccessControl another = new ZKAccessControl(ace, zkPath);
try {
- Await.result(another.create(zkc));
+ FutureUtils.result(another.create(zkc));
} catch (KeeperException.NodeExistsException ke) {
// expected
}
@@ -81,19 +81,19 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
ace.setDenyDelete(true);
ZKAccessControl zkac = new ZKAccessControl(ace, zkPath);
- Await.result(zkac.create(zkc));
+ Utils.ioResult(zkac.create(zkc));
- ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+ ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(zkac, readZKAC);
- Await.result(ZKAccessControl.delete(zkc, zkPath));
+ Utils.ioResult(ZKAccessControl.delete(zkc, zkPath));
try {
- Await.result(ZKAccessControl.read(zkc, zkPath, null));
+ FutureUtils.result(ZKAccessControl.read(zkc, zkPath, null));
} catch (KeeperException.NoNodeException nne) {
// expected.
}
- Await.result(ZKAccessControl.delete(zkc, zkPath));
+ Utils.ioResult(ZKAccessControl.delete(zkc, zkPath));
}
@Test(timeout = 60000)
@@ -102,7 +102,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
zkc.get().create(zkPath, new byte[0], zkc.getDefaultACL(), CreateMode.PERSISTENT);
- ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+ ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(zkPath, readZKAC.getZKPath());
assertEquals(ZKAccessControl.DEFAULT_ACCESS_CONTROL_ENTRY, readZKAC.getAccessControlEntry());
@@ -116,7 +116,7 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
zkc.get().create(zkPath, "corrupted-data".getBytes(UTF_8), zkc.getDefaultACL(), CreateMode.PERSISTENT);
try {
- Await.result(ZKAccessControl.read(zkc, zkPath, null));
+ Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
} catch (ZKAccessControl.CorruptedAccessControlException cace) {
// expected
}
@@ -130,25 +130,25 @@ public class TestZKAccessControl extends ZooKeeperClusterTestCase {
ace.setDenyDelete(true);
ZKAccessControl zkac = new ZKAccessControl(ace, zkPath);
- Await.result(zkac.create(zkc));
+ Utils.ioResult(zkac.create(zkc));
- ZKAccessControl readZKAC = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+ ZKAccessControl readZKAC = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(zkac, readZKAC);
ace.setDenyRelease(true);
ZKAccessControl newZKAC = new ZKAccessControl(ace, zkPath);
- Await.result(newZKAC.update(zkc));
- ZKAccessControl readZKAC2 = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+ Utils.ioResult(newZKAC.update(zkc));
+ ZKAccessControl readZKAC2 = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(newZKAC, readZKAC2);
try {
- Await.result(readZKAC.update(zkc));
+ FutureUtils.result(readZKAC.update(zkc));
} catch (KeeperException.BadVersionException bve) {
// expected
}
readZKAC2.getAccessControlEntry().setDenyTruncate(true);
- Await.result(readZKAC2.update(zkc));
- ZKAccessControl readZKAC3 = Await.result(ZKAccessControl.read(zkc, zkPath, null));
+ Utils.ioResult(readZKAC2.update(zkc));
+ ZKAccessControl readZKAC3 = Utils.ioResult(ZKAccessControl.read(zkc, zkPath, null));
assertEquals(readZKAC2, readZKAC3);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java
index 19c301b..868549e 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/acl/TestZKAccessControlManager.java
@@ -25,7 +25,7 @@ import org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.apache.distributedlog.impl.acl.ZKAccessControl;
import org.apache.distributedlog.impl.acl.ZKAccessControlManager;
import org.apache.distributedlog.thrift.AccessControlEntry;
-import com.twitter.util.Await;
+import org.apache.distributedlog.util.Utils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -136,7 +136,7 @@ public class TestZKAccessControlManager extends ZooKeeperClusterTestCase {
verifyStreamPermissions(zkcm, stream2, true, false, true, true, true);
// delete stream2
- Await.result(ZKAccessControl.delete(zkc, zkPath2));
+ Utils.ioResult(ZKAccessControl.delete(zkc, zkPath2));
logger.info("Delete ACL for stream {}", stream2);
while (!zkcm.allowTruncate(stream2)) {
Thread.sleep(100);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
index 4f968b6..8a2c476 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDLCK.java
@@ -17,21 +17,20 @@
*/
package org.apache.distributedlog.admin;
-import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.util.OrderedScheduler;
-import org.apache.distributedlog.util.SchedulerUtils;
+import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
@@ -105,7 +104,7 @@ public class TestDLCK extends TestDistributedLogBase {
confLocal.setLogSegmentCacheEnabled(false);
URI uri = createDLMURI("/check-and-repair-dl-namespace");
zkc.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(confLocal)
.uri(uri)
.build();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
index f911f15..f7f859c 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/admin/TestDistributedLogAdmin.java
@@ -18,14 +18,14 @@
package org.apache.distributedlog.admin;
import java.net.URI;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
import org.apache.distributedlog.exceptions.UnexpectedException;
-import org.apache.distributedlog.namespace.DistributedLogNamespace;
-import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
@@ -36,19 +36,16 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.distributedlog.AsyncLogReader;
+import org.apache.distributedlog.api.AsyncLogReader;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.LogRecord;
import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.metadata.DryrunLogSegmentMetadataStoreUpdater;
import org.apache.distributedlog.metadata.LogSegmentMetadataStoreUpdater;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
import static org.junit.Assert.*;
@@ -92,11 +89,11 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
URI uri = createDLMURI("/change-sequence-number");
zooKeeperClient.get().create(uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace namespace = NamespaceBuilder.newBuilder()
.conf(confLocal)
.uri(uri)
.build();
- DistributedLogNamespace readNamespace = DistributedLogNamespaceBuilder.newBuilder()
+ Namespace readNamespace = NamespaceBuilder.newBuilder()
.conf(readConf)
.uri(uri)
.build();
@@ -117,7 +114,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
long expectedTxId = 1L;
DLSN lastDLSN = DLSN.InitialDLSN;
for (int i = 0; i < 4 * 10; i++) {
- LogRecordWithDLSN record = Await.result(reader.readNext());
+ LogRecordWithDLSN record = Utils.ioResult(reader.readNext());
assertNotNull(record);
DLMTestUtil.verifyLogRecord(record);
assertEquals(expectedTxId, record.getTransactionId());
@@ -133,9 +130,9 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
LOG.info("Injected bad log segment '3'");
// there isn't records should be read
- Future<LogRecordWithDLSN> readFuture = reader.readNext();
+ CompletableFuture<LogRecordWithDLSN> readFuture = reader.readNext();
try {
- LogRecordWithDLSN record = Await.result(readFuture);
+ LogRecordWithDLSN record = Utils.ioResult(readFuture);
fail("Should fail reading next record "
+ record
+ " when there is a corrupted log segment");
@@ -151,7 +148,7 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
try {
reader = readDLM.getAsyncLogReader(lastDLSN);
- Await.result(reader.readNext());
+ Utils.ioResult(reader.readNext());
fail("Should fail reading next when there is a corrupted log segment");
} catch (UnexpectedException ue) {
// expected
@@ -166,18 +163,18 @@ public class TestDistributedLogAdmin extends TestDistributedLogBase {
// be able to read more after fix
reader = readDLM.getAsyncLogReader(lastDLSN);
// skip the first record
- Await.result(reader.readNext());
+ Utils.ioResult(reader.readNext());
readFuture = reader.readNext();
expectedTxId = 51L;
- LogRecord record = Await.result(readFuture);
+ LogRecord record = Utils.ioResult(readFuture);
assertNotNull(record);
DLMTestUtil.verifyLogRecord(record);
assertEquals(expectedTxId, record.getTransactionId());
expectedTxId++;
for (int i = 1; i < 10; i++) {
- record = Await.result(reader.readNext());
+ record = Utils.ioResult(reader.readNext());
assertNotNull(record);
DLMTestUtil.verifyLogRecord(record);
assertEquals(expectedTxId, record.getTransactionId());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
index 2492c06..925cad5 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocator.java
@@ -17,22 +17,21 @@
*/
package org.apache.distributedlog.bk;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
import org.apache.distributedlog.bk.SimpleLedgerAllocator.AllocationException;
import org.apache.distributedlog.bk.SimpleLedgerAllocator.Phase;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.exceptions.ZKException;
-import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.Transaction.OpListener;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.DefaultZKOp;
import org.apache.distributedlog.zk.ZKTransaction;
-import com.twitter.util.Future;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
@@ -53,7 +52,6 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashSet;
@@ -116,13 +114,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
return new ZKTransaction(zkc);
}
- private SimpleLedgerAllocator createAllocator(String allocationPath) throws IOException {
+ private SimpleLedgerAllocator createAllocator(String allocationPath) throws Exception {
return createAllocator(allocationPath, dlConf);
}
private SimpleLedgerAllocator createAllocator(String allocationPath,
- DistributedLogConfiguration conf) throws IOException {
- return FutureUtils.result(SimpleLedgerAllocator.of(allocationPath, null, newQuorumConfigProvider(conf), zkc, bkc));
+ DistributedLogConfiguration conf) throws Exception {
+ return Utils.ioResult(SimpleLedgerAllocator.of(allocationPath, null, newQuorumConfigProvider(conf), zkc, bkc));
}
/**
@@ -136,13 +134,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
SimpleLedgerAllocator allocator = createAllocator(allocationPath);
allocator.allocate();
ZKTransaction txn = newTxn();
- LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
logger.info("Try obtaining ledger handle {}", lh.getId());
byte[] data = zkc.get().getData(allocationPath, false, null);
assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
try {
- FutureUtils.result(txn.execute());
+ Utils.ioResult(txn.execute());
fail("Should fail the transaction when setting unexisted path");
} catch (ZKException ke) {
// expected
@@ -154,9 +152,9 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
// Create new transaction to obtain the ledger again.
txn = newTxn();
// we could obtain the ledger if it was obtained
- LedgerHandle newLh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
+ LedgerHandle newLh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
assertEquals(lh.getId(), newLh.getId());
- FutureUtils.result(txn.execute());
+ Utils.ioResult(txn.execute());
data = zkc.get().getData(allocationPath, false, null);
assertEquals(0, data.length);
Utils.close(allocator);
@@ -177,16 +175,16 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
allocator1.allocate();
// wait until allocated
ZKTransaction txn1 = newTxn();
- LedgerHandle lh = FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER));
+ LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
allocator2.allocate();
ZKTransaction txn2 = newTxn();
try {
- FutureUtils.result(allocator2.tryObtain(txn2, NULL_LISTENER));
+ Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
- } catch (ZKException zke) {
- assertEquals(KeeperException.Code.BADVERSION, zke.getKeeperExceptionCode());
+ } catch (ZKException ke) {
+ assertEquals(KeeperException.Code.BADVERSION, ke.getKeeperExceptionCode());
}
- FutureUtils.result(txn1.execute());
+ Utils.ioResult(txn1.execute());
Utils.close(allocator1);
Utils.close(allocator2);
@@ -217,7 +215,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
ZKTransaction txn1 = newTxn();
try {
- FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER));
+ Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
fail("Should fail allocating ledger if there aren't enough bookies");
} catch (AllocationException ioe) {
// expected
@@ -241,7 +239,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
allocator1.allocate();
// wait until allocated
ZKTransaction txn1 = newTxn();
- LedgerHandle lh1 = FutureUtils.result(allocator1.tryObtain(txn1, NULL_LISTENER));
+ LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
// Second allocator kicks in
stat = new Stat();
@@ -252,16 +250,16 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
allocator2.allocate();
// wait until allocated
ZKTransaction txn2 = newTxn();
- LedgerHandle lh2 = FutureUtils.result(allocator2.tryObtain(txn2, NULL_LISTENER));
+ LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
// should fail to commit txn1 as version is changed by second allocator
try {
- FutureUtils.result(txn1.execute());
+ Utils.ioResult(txn1.execute());
fail("Should fail commit obtaining ledger handle from first allocator as allocator is modified by second allocator.");
} catch (ZKException ke) {
// as expected
}
- FutureUtils.result(txn2.execute());
+ Utils.ioResult(txn2.execute());
Utils.close(allocator1);
Utils.close(allocator2);
@@ -298,7 +296,7 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
allocator.allocate();
ZKTransaction txn = newTxn();
// close during obtaining ledger.
- LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
Utils.close(allocator);
byte[] data = zkc.get().getData(allocationPath, false, null);
assertEquals((Long) lh.getId(), Long.valueOf(new String(data, UTF_8)));
@@ -319,8 +317,8 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
allocator.allocate();
ZKTransaction txn = newTxn();
// close during obtaining ledger.
- LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
- FutureUtils.result(txn.execute());
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ Utils.ioResult(txn.execute());
Utils.close(allocator);
byte[] data = zkc.get().getData(allocationPath, false, null);
assertEquals(0, data.length);
@@ -336,10 +334,10 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
allocator.allocate();
ZKTransaction txn = newTxn();
// close during obtaining ledger.
- LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(UTF_8), -1), null));
try {
- FutureUtils.result(txn.execute());
+ Utils.ioResult(txn.execute());
fail("Should fail the transaction when setting unexisted path");
} catch (ZKException ke) {
// expected
@@ -358,13 +356,13 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
SimpleLedgerAllocator allocator = createAllocator(allcationPath);
allocator.allocate();
ZKTransaction txn1 = newTxn();
- Future<LedgerHandle> obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
+ CompletableFuture<LedgerHandle> obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
ZKTransaction txn2 = newTxn();
- Future<LedgerHandle> obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
- assertTrue(obtainFuture2.isDefined());
- assertTrue(obtainFuture2.isThrow());
+ CompletableFuture<LedgerHandle> obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
+ assertTrue(obtainFuture2.isDone());
+ assertTrue(obtainFuture2.isCompletedExceptionally());
try {
- FutureUtils.result(obtainFuture2);
+ Utils.ioResult(obtainFuture2);
fail("Should fail the concurrent obtain since there is already a transaction obtaining the ledger handle");
} catch (SimpleLedgerAllocator.ConcurrentObtainException cbe) {
// expected
@@ -380,8 +378,8 @@ public class TestLedgerAllocator extends TestDistributedLogBase {
for (int i = 0; i < numLedgers; i++) {
allocator.allocate();
ZKTransaction txn = newTxn();
- LedgerHandle lh = FutureUtils.result(allocator.tryObtain(txn, NULL_LISTENER));
- FutureUtils.result(txn.execute());
+ LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
+ Utils.ioResult(txn.execute());
allocatedLedgers.add(lh);
}
assertEquals(numLedgers, allocatedLedgers.size());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java
index e1aaa0b..a42d688 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/bk/TestLedgerAllocatorPool.java
@@ -24,7 +24,6 @@ import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.Transaction.OpListener;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.ZKTransaction;
@@ -127,7 +126,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
for (int i = 0; i < numAllocators; i++) {
try {
pool.allocate();
- FutureUtils.result(pool.tryObtain(newTxn(), NULL_LISTENER));
+ Utils.ioResult(pool.tryObtain(newTxn(), NULL_LISTENER));
fail("Should fail to allocate ledger if there are enought bookies");
} catch (SimpleLedgerAllocator.AllocationException ae) {
assertEquals(SimpleLedgerAllocator.Phase.ERROR, ae.getPhase());
@@ -136,7 +135,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
for (int i = 0; i < numAllocators; i++) {
try {
pool.allocate();
- FutureUtils.result(pool.tryObtain(newTxn(), NULL_LISTENER));
+ Utils.ioResult(pool.tryObtain(newTxn(), NULL_LISTENER));
fail("Should fail to allocate ledger if there aren't available allocators");
} catch (SimpleLedgerAllocator.AllocationException ae) {
assertEquals(SimpleLedgerAllocator.Phase.ERROR, ae.getPhase());
@@ -159,7 +158,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
for (int i = 0; i < numAllocators; i++) {
ZKTransaction txn = newTxn();
pool.allocate();
- LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
+ LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
// get the corresponding ledger allocator
SimpleLedgerAllocator sla = pool.getLedgerAllocator(lh);
@@ -176,7 +175,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
String slaPath = allocatePaths.get(i);
// execute the transaction to confirm/abort obtain
- FutureUtils.result(txn.execute());
+ Utils.ioResult(txn.execute());
// introduce error to individual ledger allocator
byte[] data = zkc.get().getData(slaPath, false, new Stat());
@@ -188,7 +187,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
try {
pool.allocate();
ZKTransaction txn = newTxn();
- LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
+ LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
// get the corresponding ledger allocator
SimpleLedgerAllocator sla = pool.getLedgerAllocator(lh);
@@ -197,7 +196,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
logger.info("Allocated ledger {} from path {}", lh.getId(), slaPath);
allocatedPathSet.add(slaPath);
- FutureUtils.result(txn.execute());
+ Utils.ioResult(txn.execute());
++numSuccess;
} catch (IOException ioe) {
// continue
@@ -229,7 +228,7 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
LedgerAllocatorPool pool = new LedgerAllocatorPool(allocationPath, 0, dlConf, zkc, bkc, allocationExecutor);
ZKTransaction txn = newTxn();
try {
- FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
+ Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
fail("Should fail obtain ledger handle if there is no allocator.");
} catch (SimpleLedgerAllocator.AllocationException ae) {
fail("Should fail obtain ledger handle if there is no allocator.");
@@ -251,8 +250,8 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
for (int i = 0; i < numLedgers; i++) {
pool.allocate();
ZKTransaction txn = newTxn();
- LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
- FutureUtils.result(txn.execute());
+ LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
+ Utils.ioResult(txn.execute());
allocatedLedgers.add(lh);
}
assertEquals(numLedgers, allocatedLedgers.size());
@@ -280,8 +279,8 @@ public class TestLedgerAllocatorPool extends TestDistributedLogBase {
for (int i = 0; i < numLedgers; i++) {
pool.allocate();
ZKTransaction txn = newTxn();
- LedgerHandle lh = FutureUtils.result(pool.tryObtain(txn, NULL_LISTENER));
- FutureUtils.result(txn.execute());
+ LedgerHandle lh = Utils.ioResult(pool.tryObtain(txn, NULL_LISTENER));
+ Utils.ioResult(txn.execute());
lh.close();
allocatedLedgers.putIfAbsent(lh.getId(), lh);
logger.info("[thread {}] allocate {}th ledger {}",
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java
index f371007..5efa7e4 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/PropertiesWriter.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,10 +20,12 @@ package org.apache.distributedlog.config;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Properties;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Writer to write properties to files.
+ */
public class PropertiesWriter {
static final Logger LOG = LoggerFactory.getLogger(PropertiesWriter.class);
@@ -57,7 +59,7 @@ public class PropertiesWriter {
public void save() throws Exception {
FileOutputStream outputStream = new FileOutputStream(configFile);
properties.store(outputStream, null);
- configFile.setLastModified(configFile.lastModified()+1000);
+ configFile.setLastModified(configFile.lastModified() + 1000);
LOG.debug("save modified={}", configFile.lastModified());
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java
deleted file mode 100644
index 9563511..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConcurrentBaseConfiguration.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
-
-public class TestConcurrentBaseConfiguration {
- static final Logger LOG = LoggerFactory.getLogger(TestConcurrentBaseConfiguration.class);
-
- @Test(timeout = 20000)
- public void testBasicOperations() throws Exception {
- ConcurrentBaseConfiguration conf = new ConcurrentBaseConfiguration();
- conf.setProperty("prop1", "1");
- assertEquals(1, conf.getInt("prop1"));
- conf.setProperty("prop1", "2");
- assertEquals(2, conf.getInt("prop1"));
- conf.clearProperty("prop1");
- assertEquals(null, conf.getInteger("prop1", null));
- conf.setProperty("prop1", "1");
- conf.setProperty("prop2", "2");
- assertEquals(1, conf.getInt("prop1"));
- assertEquals(2, conf.getInt("prop2"));
- conf.clearProperty("prop1");
- assertEquals(null, conf.getInteger("prop1", null));
- assertEquals(2, conf.getInt("prop2"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java
deleted file mode 100644
index 8420a97..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestConfigurationSubscription.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.config;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.commons.configuration.event.ConfigurationEvent;
-import org.apache.commons.configuration.event.ConfigurationListener;
-import org.jmock.lib.concurrent.DeterministicScheduler;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.junit.Assert.*;
-
-/**
- * Notes:
- * 1. lastModified granularity is platform dependent, generally 1 sec, so we can't wait 1ms for things to
- * get picked up.
- */
-public class TestConfigurationSubscription {
- static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class);
-
- /**
- * Give FileChangedReloadingStrategy some time to start reloading
- * Make sure now!=lastChecked
- * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()}
- */
- private void ensureConfigReloaded() throws InterruptedException {
- // sleep 1 ms so that System.currentTimeMillis() !=
- // lastChecked (the time we construct FileChangedReloadingStrategy
- Thread.sleep(1);
- }
-
- @Test(timeout = 60000)
- public void testReloadConfiguration() throws Exception {
- PropertiesWriter writer = new PropertiesWriter();
- FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
- ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
- DeterministicScheduler executorService = new DeterministicScheduler();
- List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
- ConfigurationSubscription confSub =
- new ConfigurationSubscription(conf, fileConfigBuilders, executorService, 100, TimeUnit.MILLISECONDS);
- final AtomicReference<ConcurrentBaseConfiguration> confHolder = new AtomicReference<>();
- confSub.registerListener(new org.apache.distributedlog.config.ConfigurationListener() {
- @Override
- public void onReload(ConcurrentBaseConfiguration conf) {
- confHolder.set(conf);
- }
- });
- assertEquals(null, conf.getProperty("prop1"));
-
- // add
- writer.setProperty("prop1", "1");
- writer.save();
- // ensure the file change reloading event can be triggered
- ensureConfigReloaded();
- // reload the config
- confSub.reload();
- assertNotNull(confHolder.get());
- assertTrue(conf == confHolder.get());
- assertEquals("1", conf.getProperty("prop1"));
- }
-
- @Test(timeout = 60000)
- public void testAddReloadBasicsConfig() throws Exception {
- PropertiesWriter writer = new PropertiesWriter();
- DeterministicScheduler mockScheduler = new DeterministicScheduler();
- FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
- ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
- List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
- ConfigurationSubscription confSub =
- new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
- assertEquals(null, conf.getProperty("prop1"));
-
- // add
- writer.setProperty("prop1", "1");
- writer.save();
- // ensure the file change reloading event can be triggered
- ensureConfigReloaded();
- mockScheduler.tick(100, TimeUnit.MILLISECONDS);
- assertEquals("1", conf.getProperty("prop1"));
-
- }
-
- @Test(timeout = 60000)
- public void testInitialConfigLoad() throws Exception {
- PropertiesWriter writer = new PropertiesWriter();
- writer.setProperty("prop1", "1");
- writer.setProperty("prop2", "abc");
- writer.setProperty("prop3", "123.0");
- writer.setProperty("prop4", "11132");
- writer.setProperty("prop5", "true");
- writer.save();
-
- ScheduledExecutorService mockScheduler = new DeterministicScheduler();
- FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
- ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
- List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
- ConfigurationSubscription confSub =
- new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
- assertEquals(1, conf.getInt("prop1"));
- assertEquals("abc", conf.getString("prop2"));
- assertEquals(123.0, conf.getFloat("prop3"), 0);
- assertEquals(11132, conf.getInt("prop4"));
- assertEquals(true, conf.getBoolean("prop5"));
- }
-
- @Test(timeout = 60000)
- public void testExceptionInConfigLoad() throws Exception {
- PropertiesWriter writer = new PropertiesWriter();
- writer.setProperty("prop1", "1");
- writer.save();
-
- DeterministicScheduler mockScheduler = new DeterministicScheduler();
- FileConfigurationBuilder builder = new PropertiesConfigurationBuilder(writer.getFile().toURI().toURL());
- ConcurrentConstConfiguration conf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
- List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(builder);
- ConfigurationSubscription confSub =
- new ConfigurationSubscription(conf, fileConfigBuilders, mockScheduler, 100, TimeUnit.MILLISECONDS);
-
- final AtomicInteger count = new AtomicInteger(1);
- conf.addConfigurationListener(new ConfigurationListener() {
- @Override
- public void configurationChanged(ConfigurationEvent event) {
- LOG.info("config changed {}", event);
- // Throw after so we actually see the update anyway.
- if (!event.isBeforeUpdate()) {
- count.getAndIncrement();
- throw new RuntimeException("config listener threw and exception");
- }
- }
- });
-
- int i = 0;
- int initial = 0;
- while (count.get() == initial) {
- writer.setProperty("prop1", Integer.toString(i++));
- writer.save();
- mockScheduler.tick(100, TimeUnit.MILLISECONDS);
- }
-
- initial = count.get();
- while (count.get() == initial) {
- writer.setProperty("prop1", Integer.toString(i++));
- writer.save();
- mockScheduler.tick(100, TimeUnit.MILLISECONDS);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
index b5d6300..21aa1c9 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicConfigurationFactory.java
@@ -23,12 +23,13 @@ import com.google.common.base.Optional;
import org.apache.distributedlog.DistributedLogConfiguration;
import java.io.File;
-import java.io.FileNotFoundException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.configuration.ConfigurationException;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
+import org.apache.distributedlog.common.config.PropertiesWriter;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java
index c1ac98a..2731af3 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/config/TestDynamicDistributedLogConfiguration.java
@@ -20,6 +20,8 @@ package org.apache.distributedlog.config;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.bk.QuorumConfig;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentConstConfiguration;
import org.junit.Test;
import static org.apache.distributedlog.DistributedLogConfiguration.*;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java
index 3ce4952..1064a6f 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestConfigurationFeatureProvider.java
@@ -17,7 +17,7 @@
*/
package org.apache.distributedlog.feature;
-import org.apache.distributedlog.config.ConcurrentBaseConfiguration;
+import org.apache.distributedlog.common.config.ConcurrentBaseConfiguration;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.SettableFeature;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
index 5d4472d..f8dd245 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java
@@ -18,8 +18,8 @@
package org.apache.distributedlog.feature;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.annotations.DistributedLogAnnotations;
-import org.apache.distributedlog.config.PropertiesWriter;
+import org.apache.distributedlog.common.annotations.DistributedLogAnnotations;
+import org.apache.distributedlog.common.config.PropertiesWriter;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Ignore;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
index b2fcbf6..db9fb31 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/TestZKLogMetadataStore.java
@@ -23,7 +23,6 @@ import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
@@ -89,12 +88,12 @@ public class TestZKLogMetadataStore extends TestDistributedLogBase {
@Test(timeout = 60000)
public void testCreateLog() throws Exception {
- assertEquals(uri, FutureUtils.result(metadataStore.createLog("test")));
+ assertEquals(uri, Utils.ioResult(metadataStore.createLog("test")));
}
@Test(timeout = 60000)
public void testGetLogLocation() throws Exception {
- Optional<URI> uriOptional = FutureUtils.result(metadataStore.getLogLocation("test"));
+ Optional<URI> uriOptional = Utils.ioResult(metadataStore.getLogLocation("test"));
assertTrue(uriOptional.isPresent());
assertEquals(uri, uriOptional.get());
}
@@ -107,7 +106,7 @@ public class TestZKLogMetadataStore extends TestDistributedLogBase {
logs.add(logName);
createLogInNamespace(uri, logName);
}
- Set<String> result = Sets.newHashSet(FutureUtils.result(metadataStore.getLogs()));
+ Set<String> result = Sets.newHashSet(Utils.ioResult(metadataStore.getLogs()));
assertEquals(10, result.size());
assertTrue(Sets.difference(logs, result).isEmpty());
}