You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/21 17:20:35 UTC
[03/23] incubator-distributedlog git commit: DL-124: Use Java8 Future
rather than twitter Future
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
index 4d4a008..5189104 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/lock/TestZKSessionLock.java
@@ -17,6 +17,7 @@
*/
package org.apache.distributedlog.lock;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.ZooKeeperClient;
@@ -27,11 +28,10 @@ import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.lock.ZKSessionLock.State;
import org.apache.distributedlog.util.FailpointUtils;
import org.apache.distributedlog.util.OrderedScheduler;
-import com.twitter.util.Await;
-import com.twitter.util.Promise;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
@@ -43,7 +43,6 @@ import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import scala.runtime.BoxedUnit;
import java.io.IOException;
import java.util.Collections;
@@ -149,9 +148,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
String node2 = getLockIdFromPath(createLockNodeV2(zk, lockPath, clientId));
String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId));
- assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node1)));
- assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node2)));
- assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node3)));
+ assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node1)));
+ assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node2)));
+ assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node3)));
// Bad Lock Node Name
String node4 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member"));
@@ -160,15 +159,15 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
String node7 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode"));
String node8 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_badnode_badnode_badnode_badnode"));
- assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node4)));
- assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node5)));
- assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node6)));
- assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node7)));
- assertEquals(lockId, Await.result(asyncParseClientID(zk, lockPath, node8)));
+ assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node4)));
+ assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node5)));
+ assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node6)));
+ assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node7)));
+ assertEquals(lockId, Utils.ioResult(asyncParseClientID(zk, lockPath, node8)));
// Malformed Node Name
String node9 = getLockIdFromPath(createLockNodeWithBadNodeName(zk, lockPath, clientId, "member_malformed_s12345678_999999"));
- assertEquals(Pair.of("malformed", 12345678L), Await.result(asyncParseClientID(zk, lockPath, node9)));
+ assertEquals(Pair.of("malformed", 12345678L), Utils.ioResult(asyncParseClientID(zk, lockPath, node9)));
}
@Test(timeout = 60000)
@@ -256,7 +255,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals("counter should not be increased in different epochs", 1, counter.get());
// lock action would not be executed in same epoch and promise would be satisfied with exception
- Promise<BoxedUnit> promise = new Promise<BoxedUnit>();
+ CompletableFuture<Void> promise = new CompletableFuture<Void>();
lock.executeLockAction(lock.getEpoch().get() + 1, new LockAction() {
@Override
public void execute() {
@@ -269,7 +268,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
}
}, promise);
try {
- Await.result(promise);
+ Utils.ioResult(promise);
fail("Should satisfy promise with epoch changed exception.");
} catch (EpochChangedException ece) {
// expected
@@ -457,7 +456,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
- assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
// lock should fail on a success lock
try {
@@ -469,7 +468,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock.getLockState());
children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
- assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
// unlock
lock.unlock();
@@ -546,7 +545,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
try {
lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
@@ -559,7 +558,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLOSED, lock1.getLockState());
children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock0.unlock();
// verification after unlock lock0
@@ -574,7 +573,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock2.getLockState());
children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
- assertEquals(lock2.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock2.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
lock2.unlock();
}
@@ -649,7 +648,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
final CountDownLatch lock1DoneLatch = new CountDownLatch(1);
@@ -687,7 +686,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock1.getLockState());
children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
lock1.unlock();
}
@@ -719,7 +718,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock.getLockState());
List<String> children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
- assertEquals(lock.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
expiredLatch.await();
@@ -806,7 +805,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId1, lockStateExecutor);
final CountDownLatch lock1DoneLatch = new CountDownLatch(1);
@@ -830,9 +829,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(2, children.size());
assertEquals(State.CLAIMED, lock0.getLockState());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
awaitState(State.WAITING, lock1);
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
// expire lock1
ZooKeeperClientUtils.expireSession(zkc, zkServers, sessionTimeoutMs);
@@ -843,7 +842,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLOSED, lock1.getLockState());
children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
}
public void awaitState(State state, ZKSessionLock lock) throws InterruptedException {
@@ -891,7 +890,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLOSED, lock1_0.getLockState());
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
// lock1_1 would wait the ownership
final ZKSessionLock lock1_1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
@@ -917,9 +916,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(2, children.size());
assertEquals(State.CLAIMED, lock0.getLockState());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
awaitState(State.WAITING, lock1_1);
- assertEquals(lock1_1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+ assertEquals(lock1_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
if (isUnlock) {
lock0.unlock();
@@ -938,7 +937,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(State.CLAIMED, lock1_1.getLockState());
children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
- assertEquals(lock1_1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock1_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
lock1_1.unlock();
}
@@ -1040,9 +1039,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(2, children.size());
assertEquals(State.CLAIMED, lock0_0.getLockState());
- assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
awaitState(State.WAITING, lock1);
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
final CountDownLatch lock0DoneLatch = new CountDownLatch(1);
final AtomicReference<String> ownerFromLock0 = new AtomicReference<String>(null);
@@ -1058,9 +1057,9 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
children = getLockWaiters(zkc, lockPath);
assertEquals(2, children.size());
assertEquals(State.CLAIMED, lock0_0.getLockState());
- assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(State.WAITING, lock1.getLockState());
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
} else {
lock0Thread = new Thread(new Runnable() {
@Override
@@ -1087,11 +1086,11 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
assertEquals(3, children.size());
assertEquals(State.CLAIMED, lock0_0.getLockState());
- assertEquals(lock0_0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0_0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
awaitState(State.WAITING, lock1);
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(1))));
awaitState(State.WAITING, lock0_1);
- assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(2))));
+ assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(2))));
}
if (isUnlock) {
@@ -1114,7 +1113,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
assertEquals(State.CLAIMED, lock1.getLockState());
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
} else {
assertNotNull(lock0Thread);
if (!isUnlock) {
@@ -1128,14 +1127,14 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
assertEquals(State.CLAIMED, lock1.getLockState());
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
} else {
children = getLockWaiters(zkc, lockPath);
assertEquals(2, children.size());
assertEquals(State.CLAIMED, lock1.getLockState());
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc.get(), lockPath, children.get(0))));
assertEquals(State.WAITING, lock0_1.getLockState());
- assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
+ assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
}
}
@@ -1148,7 +1147,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
children = getLockWaiters(zkc, lockPath);
assertEquals(1, children.size());
assertEquals(State.CLAIMED, lock0_1.getLockState());
- assertEquals(lock0_1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0_1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
}
}
@@ -1186,15 +1185,15 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
List<String> children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(State.CLAIMED, lock0.getLockState());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock1.tryLock(timeout, TimeUnit.MILLISECONDS);
children = getLockWaiters(zkc0, lockPath);
assertEquals(2, children.size());
assertEquals(State.CLAIMED, lock0.getLockState());
- assertEquals(lock0.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock0.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
assertEquals(State.CLAIMED, lock1.getLockState());
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(1))));
if (isUnlock) {
lock0.unlock();
@@ -1202,7 +1201,7 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
children = getLockWaiters(zkc0, lockPath);
assertEquals(1, children.size());
assertEquals(State.CLAIMED, lock1.getLockState());
- assertEquals(lock1.getLockId(), Await.result(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
+ assertEquals(lock1.getLockId(), Utils.ioResult(asyncParseClientID(zkc0.get(), lockPath, children.get(0))));
lock1.unlock();
} else {
ZooKeeperClientUtils.expireSession(zkc0, zkServers, sessionTimeoutMs);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java b/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java
index 5943b64..6687b7b 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/logsegment/TestRollingPolicy.java
@@ -17,7 +17,7 @@
*/
package org.apache.distributedlog.logsegment;
-import org.apache.distributedlog.util.Sizable;
+import org.apache.distributedlog.common.util.Sizable;
import org.junit.Test;
import static org.junit.Assert.*;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java b/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
index e18fb3f..2090828 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/metadata/TestLogSegmentMetadataStoreUpdater.java
@@ -24,12 +24,11 @@ import org.apache.distributedlog.LogRecordWithDLSN;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
-import org.apache.distributedlog.ZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.apache.distributedlog.impl.ZKLogSegmentMetadataStore;
import org.apache.distributedlog.logsegment.LogSegmentMetadataStore;
-import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
+import org.apache.distributedlog.util.Utils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.junit.After;
@@ -105,7 +104,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
// Dryrun
MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore);
- FutureUtils.result(dryrunUpdater.changeSequenceNumber(segment, 6L));
+ Utils.ioResult(dryrunUpdater.changeSequenceNumber(segment, 6L));
segmentList = readLogSegments(ledgerPath);
assertEquals(5, segmentList.size());
@@ -113,7 +112,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
// Fix the inprogress log segments
MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
- FutureUtils.result(updater.changeSequenceNumber(segment, 6L));
+ Utils.ioResult(updater.changeSequenceNumber(segment, 6L));
segmentList = readLogSegments(ledgerPath);
assertEquals(6, segmentList.size());
@@ -156,19 +155,19 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
// Dryrun
MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore);
try {
- FutureUtils.result(dryrunUpdater.updateLastRecord(completedLogSegment, badRecord));
+ Utils.ioResult(dryrunUpdater.updateLastRecord(completedLogSegment, badRecord));
fail("Should fail on updating dlsn that in different log segment");
} catch (IllegalArgumentException iae) {
// expected
}
try {
- FutureUtils.result(dryrunUpdater.updateLastRecord(inprogressLogSegment, goodRecord2));
+ Utils.ioResult(dryrunUpdater.updateLastRecord(inprogressLogSegment, goodRecord2));
fail("Should fail on updating dlsn for an inprogress log segment");
} catch (IllegalStateException ise) {
// expected
}
LogSegmentMetadata updatedCompletedLogSegment =
- FutureUtils.result(dryrunUpdater.updateLastRecord(completedLogSegment, goodRecord1));
+ Utils.ioResult(dryrunUpdater.updateLastRecord(completedLogSegment, goodRecord1));
assertEquals(goodLastDLSN1, updatedCompletedLogSegment.getLastDLSN());
assertEquals(goodRecord1.getTransactionId(), updatedCompletedLogSegment.getLastTxId());
assertTrue(updatedCompletedLogSegment.isRecordLastPositioninThisSegment(goodRecord1));
@@ -187,18 +186,18 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
// Fix the last dlsn
MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
try {
- FutureUtils.result(updater.updateLastRecord(completedLogSegment, badRecord));
+ Utils.ioResult(updater.updateLastRecord(completedLogSegment, badRecord));
fail("Should fail on updating dlsn that in different log segment");
} catch (IllegalArgumentException iae) {
// expected
}
try {
- FutureUtils.result(updater.updateLastRecord(inprogressLogSegment, goodRecord2));
+ Utils.ioResult(updater.updateLastRecord(inprogressLogSegment, goodRecord2));
fail("Should fail on updating dlsn for an inprogress log segment");
} catch (IllegalStateException ise) {
// expected
}
- updatedCompletedLogSegment = FutureUtils.result(updater.updateLastRecord(completedLogSegment, goodRecord1));
+ updatedCompletedLogSegment = Utils.ioResult(updater.updateLastRecord(completedLogSegment, goodRecord1));
assertEquals(goodLastDLSN1, updatedCompletedLogSegment.getLastDLSN());
assertEquals(goodRecord1.getTransactionId(), updatedCompletedLogSegment.getLastTxId());
assertTrue(updatedCompletedLogSegment.isRecordLastPositioninThisSegment(goodRecord1));
@@ -245,28 +244,28 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
// Dryrun
MetadataUpdater dryrunUpdater = new DryrunLogSegmentMetadataStoreUpdater(conf, metadataStore);
- FutureUtils.result(dryrunUpdater.setLogSegmentTruncated(segmentList.get(segmentToModify)));
+ Utils.ioResult(dryrunUpdater.setLogSegmentTruncated(segmentList.get(segmentToModify)));
segmentList = readLogSegments(ledgerPath);
assertEquals(false, segmentList.get(segmentToModify).isTruncated());
// change truncation for the 1st log segment
MetadataUpdater updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
- FutureUtils.result(updater.setLogSegmentTruncated(segmentList.get(segmentToModify)));
+ Utils.ioResult(updater.setLogSegmentTruncated(segmentList.get(segmentToModify)));
segmentList = readLogSegments(ledgerPath);
assertEquals(true, segmentList.get(segmentToModify).isTruncated());
assertEquals(false, segmentList.get(segmentToModify).isPartiallyTruncated());
updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
- FutureUtils.result(updater.setLogSegmentActive(segmentList.get(segmentToModify)));
+ Utils.ioResult(updater.setLogSegmentActive(segmentList.get(segmentToModify)));
segmentList = readLogSegments(ledgerPath);
assertEquals(false, segmentList.get(segmentToModify).isTruncated());
assertEquals(false, segmentList.get(segmentToModify).isPartiallyTruncated());
updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
- FutureUtils.result(updater.setLogSegmentPartiallyTruncated(segmentList.get(segmentToModify),
+ Utils.ioResult(updater.setLogSegmentPartiallyTruncated(segmentList.get(segmentToModify),
segmentList.get(segmentToModify).getFirstDLSN()));
segmentList = readLogSegments(ledgerPath);
@@ -274,7 +273,7 @@ public class TestLogSegmentMetadataStoreUpdater extends ZooKeeperClusterTestCase
assertEquals(true, segmentList.get(segmentToModify).isPartiallyTruncated());
updater = LogSegmentMetadataStoreUpdater.createMetadataUpdater(conf, metadataStore);
- FutureUtils.result(updater.setLogSegmentActive(segmentList.get(segmentToModify)));
+ Utils.ioResult(updater.setLogSegmentActive(segmentList.get(segmentToModify)));
segmentList = readLogSegments(ledgerPath);
assertEquals(false, segmentList.get(segmentToModify).isTruncated());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
deleted file mode 100644
index 46a3a6f..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestDistributedLogNamespaceBuilder.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.namespace;
-
-import org.apache.distributedlog.BKDistributedLogNamespace;
-import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.TestDistributedLogBase;
-import org.junit.Test;
-
-import java.net.URI;
-
-import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test Namespace Builder
- */
-public class TestDistributedLogNamespaceBuilder extends TestDistributedLogBase {
-
- @Test(timeout = 60000, expected = NullPointerException.class)
- public void testEmptyBuilder() throws Exception {
- DistributedLogNamespaceBuilder.newBuilder().build();
- }
-
- @Test(timeout = 60000, expected = NullPointerException.class)
- public void testMissingUri() throws Exception {
- DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .build();
- }
-
- @Test(timeout = 60000, expected = NullPointerException.class)
- public void testMissingSchemeInUri() throws Exception {
- DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(new URI("/test"))
- .build();
- }
-
- @Test(timeout = 60000, expected = IllegalArgumentException.class)
- public void testInvalidSchemeInUri() throws Exception {
- DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(new URI("dist://invalid/scheme/in/uri"))
- .build();
- }
-
- @Test(timeout = 60000, expected = IllegalArgumentException.class)
- public void testInvalidSchemeCorrectBackendInUri() throws Exception {
- DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(new URI("dist-bk://invalid/scheme/in/uri"))
- .build();
- }
-
- @Test(timeout = 60000, expected = IllegalArgumentException.class)
- public void testUnknownBackendInUri() throws Exception {
- DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(new URI("distributedlog-unknown://invalid/scheme/in/uri"))
- .build();
- }
-
- @Test(timeout = 60000, expected = NullPointerException.class)
- public void testNullStatsLogger() throws Exception {
- DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(new URI("distributedlog-bk://localhost/distributedlog"))
- .statsLogger(null)
- .build();
- }
-
- @Test(timeout = 60000, expected = NullPointerException.class)
- public void testNullClientId() throws Exception {
- DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(new URI("distributedlog-bk://localhost/distributedlog"))
- .clientId(null)
- .build();
- }
-
- @Test(timeout = 60000)
- public void testBuildBKDistributedLogNamespace() throws Exception {
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"))
- .build();
- try {
- assertTrue("distributedlog-bk:// should build bookkeeper based distributedlog namespace",
- namespace instanceof BKDistributedLogNamespace);
- } finally {
- namespace.close();
- }
- }
-
- @Test(timeout = 60000)
- public void testBuildWhenMissingBackendInUri() throws Exception {
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(new URI("distributedlog://" + zkServers + DLOG_NAMESPACE + "/defaultnamespace"))
- .build();
- try {
- assertTrue("distributedlog:// should build bookkeeper based distributedlog namespace",
- namespace instanceof BKDistributedLogNamespace);
- } finally {
- namespace.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
new file mode 100644
index 0000000..89b4852
--- /dev/null
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/namespace/TestNamespaceBuilder.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.namespace;
+
+import org.apache.distributedlog.BKDistributedLogNamespace;
+import org.apache.distributedlog.DistributedLogConfiguration;
+import org.apache.distributedlog.TestDistributedLogBase;
+import org.apache.distributedlog.api.namespace.Namespace;
+import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.junit.Test;
+
+import java.net.URI;
+
+import static org.apache.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test Namespace Builder
+ */
+public class TestNamespaceBuilder extends TestDistributedLogBase {
+
+ @Test(timeout = 60000, expected = NullPointerException.class)
+ public void testEmptyBuilder() throws Exception {
+ NamespaceBuilder.newBuilder().build();
+ }
+
+ @Test(timeout = 60000, expected = NullPointerException.class)
+ public void testMissingUri() throws Exception {
+ NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .build();
+ }
+
+ @Test(timeout = 60000, expected = NullPointerException.class)
+ public void testMissingSchemeInUri() throws Exception {
+ NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(new URI("/test"))
+ .build();
+ }
+
+ @Test(timeout = 60000, expected = IllegalArgumentException.class)
+ public void testInvalidSchemeInUri() throws Exception {
+ NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(new URI("dist://invalid/scheme/in/uri"))
+ .build();
+ }
+
+ @Test(timeout = 60000, expected = IllegalArgumentException.class)
+ public void testInvalidSchemeCorrectBackendInUri() throws Exception {
+ NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(new URI("dist-bk://invalid/scheme/in/uri"))
+ .build();
+ }
+
+ @Test(timeout = 60000, expected = IllegalArgumentException.class)
+ public void testUnknownBackendInUri() throws Exception {
+ NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(new URI("distributedlog-unknown://invalid/scheme/in/uri"))
+ .build();
+ }
+
+ @Test(timeout = 60000, expected = NullPointerException.class)
+ public void testNullStatsLogger() throws Exception {
+ NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(new URI("distributedlog-bk://localhost/distributedlog"))
+ .statsLogger(null)
+ .build();
+ }
+
+ @Test(timeout = 60000, expected = NullPointerException.class)
+ public void testNullClientId() throws Exception {
+ NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(new URI("distributedlog-bk://localhost/distributedlog"))
+ .clientId(null)
+ .build();
+ }
+
+ @Test(timeout = 60000)
+ public void testBuildBKDistributedLogNamespace() throws Exception {
+ Namespace namespace = NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace"))
+ .build();
+ try {
+ assertTrue("distributedlog-bk:// should build bookkeeper based distributedlog namespace",
+ namespace instanceof BKDistributedLogNamespace);
+ } finally {
+ namespace.close();
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testBuildWhenMissingBackendInUri() throws Exception {
+ Namespace namespace = NamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(new URI("distributedlog://" + zkServers + DLOG_NAMESPACE + "/defaultnamespace"))
+ .build();
+ try {
+ assertTrue("distributedlog:// should build bookkeeper based distributedlog namespace",
+ namespace instanceof BKDistributedLogNamespace);
+ } finally {
+ namespace.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java b/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java
deleted file mode 100644
index 8949bec..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/rate/TestMovingAverageRate.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.rate;
-
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.MockTimer;
-import com.twitter.util.Time$;
-import com.twitter.util.TimeControl;
-
-import org.junit.Test;
-import scala.runtime.BoxedUnit;
-
-import static org.junit.Assert.*;
-
-public class TestMovingAverageRate {
- interface TcCallback {
- void apply(TimeControl tc);
- }
-
- void withCurrentTimeFrozen(final TcCallback cb) {
- Time$.MODULE$.withCurrentTimeFrozen(new Function<TimeControl, BoxedUnit>() {
- @Override
- public BoxedUnit apply(TimeControl time) {
- cb.apply(time);
- return BoxedUnit.UNIT;
- }
- });
- }
-
- private void advance(TimeControl time, MockTimer timer, int timeMs) {
- Duration duration = Duration.fromMilliseconds(timeMs);
- time.advance(duration);
- timer.tick();
- }
-
- @Test(timeout = 60000)
- public void testNoChangeInUnderMinInterval() {
- withCurrentTimeFrozen(new TcCallback() {
- @Override
- public void apply(TimeControl time) {
- MockTimer timer = new MockTimer();
- MovingAverageRateFactory factory = new MovingAverageRateFactory(timer);
- MovingAverageRate avg60 = factory.create(60);
- avg60.add(1000);
- assertEquals(0, avg60.get(), 0);
- advance(time, timer, 1);
- assertEquals(0, avg60.get(), 0);
- advance(time, timer, 1);
- assertEquals(0, avg60.get(), 0);
- }
- });
- }
-
- @Test(timeout = 60000)
- public void testFactoryWithMultipleTimers() {
- withCurrentTimeFrozen(new TcCallback() {
- @Override
- public void apply(TimeControl time) {
- MockTimer timer = new MockTimer();
- MovingAverageRateFactory factory = new MovingAverageRateFactory(timer);
- MovingAverageRate avg60 = factory.create(60);
- MovingAverageRate avg30 = factory.create(30);
-
- // Can't test this precisely because the Rate class uses its own
- // ticker. So we can control when it gets sampled but not the time
- // value it uses. So, just do basic validation.
- for (int i = 0; i < 30; i++) {
- avg60.add(100);
- avg30.add(100);
- advance(time, timer, 1000);
- }
- double s1 = avg60.get();
- assertTrue(avg30.get() > 0);
- for (int i = 0; i < 30; i++) {
- advance(time, timer, 1000);
- }
- assertTrue(avg60.get() > 0);
- assertTrue(avg60.get() < s1);
- assertEquals(0.0, avg30.get(), 0);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java
index 47e2fae..71bf68d 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/tools/TestDistributedLogTool.java
@@ -22,11 +22,11 @@ import java.net.URI;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DLSN;
import org.apache.distributedlog.DistributedLogConfiguration;
-import org.apache.distributedlog.DistributedLogManager;
+import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.LocalDLMEmulator;
import org.apache.distributedlog.LogRecordWithDLSN;
-import org.apache.distributedlog.LogReader;
+import org.apache.distributedlog.api.LogReader;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.tools.DistributedLogTool.*;
import org.apache.bookkeeper.client.BKException.BKNoSuchLedgerExistsException;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java
deleted file mode 100644
index f9e4eb8..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestFutureUtils.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.fail;
-
-/**
- * Test Case for {@link FutureUtils}
- */
-public class TestFutureUtils {
-
- static class TestException extends IOException {
- }
-
- @Test(timeout = 60000)
- public void testWithin() throws Exception {
- OrderedScheduler scheduler = OrderedScheduler.newBuilder()
- .corePoolSize(1)
- .name("test-within")
- .build();
- final Promise<Void> promiseToTimeout = new Promise<Void>();
- final Promise<Void> finalPromise = new Promise<Void>();
- FutureUtils.within(
- promiseToTimeout,
- 10,
- TimeUnit.MILLISECONDS,
- new TestException(),
- scheduler,
- "test-within"
- ).addEventListener(new FutureEventListener<Void>() {
- @Override
- public void onFailure(Throwable cause) {
- FutureUtils.setException(finalPromise, cause);
- }
-
- @Override
- public void onSuccess(Void value) {
- FutureUtils.setValue(finalPromise, value);
- }
- });
- try {
- FutureUtils.result(finalPromise);
- fail("Should fail with TestException");
- } catch (TestException te) {
- // expected
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java
index 802649d..807ce02 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestPermitManager.java
@@ -17,6 +17,7 @@
*/
package org.apache.distributedlog.util;
+import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.zk.LimitedPermitManager;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java
deleted file mode 100644
index 7bfe5ed..0000000
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestSafeQueueingFuturePool.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.FuturePool;
-import com.twitter.util.ExecutorServiceFuturePool;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Future;
-
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
-
-import scala.runtime.BoxedUnit;
-
-public class TestSafeQueueingFuturePool {
- static final Logger LOG = LoggerFactory.getLogger(TestSafeQueueingFuturePool.class);
-
- @Rule
- public TestName runtime = new TestName();
-
- class TestFuturePool<T> {
- final ScheduledExecutorService executor;
- final FuturePool pool;
- final SafeQueueingFuturePool<T> wrapper;
- TestFuturePool() {
- executor = Executors.newScheduledThreadPool(1);
- pool = new ExecutorServiceFuturePool(executor);
- wrapper = new SafeQueueingFuturePool<T>(pool);
- }
- public void shutdown() {
- executor.shutdown();
- }
- }
-
- @Test(timeout = 60000)
- public void testSimpleSuccess() throws Exception {
- TestFuturePool<Void> pool = new TestFuturePool<Void>();
- final AtomicBoolean result = new AtomicBoolean(false);
- Future<Void> future = pool.wrapper.apply(new Function0<Void>() {
- public Void apply() {
- result.set(true);
- return null;
- }
- });
- Await.result(future);
- assertTrue(result.get());
- pool.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testSimpleFailure() throws Exception {
- TestFuturePool<Void> pool = new TestFuturePool<Void>();
- Future<Void> future = pool.wrapper.apply(new Function0<Void>() {
- public Void apply() {
- throw new RuntimeException("failed");
- }
- });
- try {
- Await.result(future);
- fail("should have thrown");
- } catch (Exception ex) {
- }
- pool.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testFailedDueToClosed() throws Exception {
- TestFuturePool<Void> pool = new TestFuturePool<Void>();
- pool.wrapper.close();
- Future<Void> future = pool.wrapper.apply(new Function0<Void>() {
- public Void apply() {
- throw new RuntimeException("failed");
- }
- });
- try {
- Await.result(future);
- fail("should have thrown");
- } catch (RejectedExecutionException ex) {
- }
- pool.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testRejectedFailure() throws Exception {
- TestFuturePool<Void> pool = new TestFuturePool<Void>();
- final AtomicBoolean result = new AtomicBoolean(false);
- pool.executor.shutdown();
- final CountDownLatch latch = new CountDownLatch(1);
- Future<Void> future = pool.wrapper.apply(new Function0<Void>() {
- public Void apply() {
- result.set(true);
- latch.countDown();
- return null;
- }
- });
- try {
- Await.result(future);
- fail("should have thrown");
- } catch (RejectedExecutionException ex) {
- }
- assertFalse(result.get());
- pool.wrapper.close();
- latch.await();
- assertTrue(result.get());
- pool.shutdown();
- }
-
- @Test(timeout = 60000)
- public void testRejectedBackupFailure() throws Exception {
- TestFuturePool<Void> pool = new TestFuturePool<Void>();
- final AtomicBoolean result = new AtomicBoolean(false);
- pool.executor.shutdownNow();
- final CountDownLatch latch1 = new CountDownLatch(1);
- final CountDownLatch latch2 = new CountDownLatch(1);
- Future<Void> future1 = pool.wrapper.apply(new Function0<Void>() {
- public Void apply() {
- try {
- latch1.await();
- } catch (Exception ex) {
- }
- return null;
- }
- });
-
- // Enqueue a set of futures behind.
- final int blockedCount = 100;
- final ArrayList<Future<Void>> blockedFutures = new ArrayList<Future<Void>>(blockedCount);
- final int[] doneArray = new int[blockedCount];
- final AtomicInteger doneCount = new AtomicInteger(0);
- for (int i = 0; i < blockedCount; i++) {
- final int index = i;
- blockedFutures.add(pool.wrapper.apply(new Function0<Void>() {
- public Void apply() {
- doneArray[index] = doneCount.getAndIncrement();
- return null;
- }
- }));
- }
-
- // All the futures fail when the executor is force closed.
- latch1.countDown();
- pool.executor.shutdownNow();
- for (int i = 0; i < blockedCount; i++) {
- try {
- Await.result(blockedFutures.get(i));
- fail("should have thrown");
- } catch (RejectedExecutionException ex) {
- }
- }
-
- // None of them have completed.
- for (int i = 0; i < blockedCount; i++) {
- assertEquals(0, doneArray[i]);
- }
-
- // Close cleans up all pending ops in order.
- pool.wrapper.close();
- for (int i = 0; i < blockedCount; i++) {
- assertEquals(i, doneArray[i]);
- }
-
- pool.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
index a9db6e0..acd441c 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/util/TestUtils.java
@@ -108,15 +108,15 @@ public class TestUtils extends ZooKeeperClusterTestCase {
@Test(timeout = 60000)
public void testZkGetData() throws Exception {
String path1 = "/zk-get-data/non-existent-path";
- Versioned<byte[]> data = FutureUtils.result(Utils.zkGetData(zkc.get(), path1, false));
+ Versioned<byte[]> data = Utils.ioResult(Utils.zkGetData(zkc.get(), path1, false));
assertNull("No data should return from non-existent-path", data.getValue());
assertNull("No version should return from non-existent-path", data.getVersion());
String path2 = "/zk-get-data/path2";
byte[] rawData = "test-data".getBytes(UTF_8);
- FutureUtils.result(Utils.zkAsyncCreateFullPathOptimistic(zkc, path2, rawData,
+ Utils.ioResult(Utils.zkAsyncCreateFullPathOptimistic(zkc, path2, rawData,
zkc.getDefaultACL(), CreateMode.PERSISTENT));
- data = FutureUtils.result(Utils.zkGetData(zkc.get(), path2, false));
+ data = Utils.ioResult(Utils.zkGetData(zkc.get(), path2, false));
assertArrayEquals("Data should return as written",
rawData, data.getValue());
assertEquals("Version should be zero",
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/pom.xml b/distributedlog-protocol/pom.xml
index a483444..7197feb 100644
--- a/distributedlog-protocol/pom.xml
+++ b/distributedlog-protocol/pom.xml
@@ -26,35 +26,15 @@
<name>Apache DistributedLog :: Protocol</name>
<dependencies>
<dependency>
- <groupId>org.apache.bookkeeper.stats</groupId>
- <artifactId>bookkeeper-stats-api</artifactId>
- <version>${bookkeeper.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>org.apache.distributedlog</groupId>
+ <artifactId>distributedlog-common</artifactId>
+ <version>${project.version}</version>
</dependency>
<dependency>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- <version>${commons-lang.version}</version>
- </dependency>
- <dependency>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- <version>${commons-codec.version}</version>
- </dependency>
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>finagle-core_2.11</artifactId>
- <version>${finagle.version}</version>
- </dependency>
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- <version>${slf4j.version}</version>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>${lombok.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>net.jpountz.lz4</groupId>
@@ -67,6 +47,18 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
index 9d2d7a7..2a60ff3 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/EnvelopedRecordSetWriter.java
@@ -26,12 +26,6 @@ import static org.apache.distributedlog.LogRecordSet.METADATA_VERSION_MASK;
import static org.apache.distributedlog.LogRecordSet.NULL_OP_STATS_LOGGER;
import static org.apache.distributedlog.LogRecordSet.VERSION;
-import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.exceptions.WriteException;
-import org.apache.distributedlog.io.Buffer;
-import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.distributedlog.io.CompressionUtils;
-import com.twitter.util.Promise;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -39,6 +33,12 @@ import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.Buffer;
+import org.apache.distributedlog.io.CompressionCodec;
+import org.apache.distributedlog.io.CompressionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -52,7 +52,7 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
private final Buffer buffer;
private final DataOutputStream writer;
private final WritableByteChannel writeChannel;
- private final List<Promise<DLSN>> promiseList;
+ private final List<CompletableFuture<DLSN>> promiseList;
private final CompressionCodec.Type codec;
private final int codecCode;
private int count = 0;
@@ -61,7 +61,7 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
EnvelopedRecordSetWriter(int initialBufferSize,
CompressionCodec.Type codec) {
this.buffer = new Buffer(Math.max(initialBufferSize, HEADER_LEN));
- this.promiseList = new LinkedList<Promise<DLSN>>();
+ this.promiseList = new LinkedList<CompletableFuture<DLSN>>();
this.codec = codec;
switch (codec) {
case LZ4:
@@ -84,13 +84,13 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
this.writeChannel = Channels.newChannel(writer);
}
- synchronized List<Promise<DLSN>> getPromiseList() {
+ synchronized List<CompletableFuture<DLSN>> getPromiseList() {
return promiseList;
}
@Override
public synchronized void writeRecord(ByteBuffer record,
- Promise<DLSN> transmitPromise)
+ CompletableFuture<DLSN> transmitPromise)
throws LogRecordTooLongException, WriteException {
int logRecordSize = record.remaining();
if (logRecordSize > MAX_LOGRECORD_SIZE) {
@@ -111,16 +111,16 @@ class EnvelopedRecordSetWriter implements LogRecordSet.Writer {
private synchronized void satisfyPromises(long lssn, long entryId, long startSlotId) {
long nextSlotId = startSlotId;
- for (Promise<DLSN> promise : promiseList) {
- promise.setValue(new DLSN(lssn, entryId, nextSlotId));
+ for (CompletableFuture<DLSN> promise : promiseList) {
+ promise.complete(new DLSN(lssn, entryId, nextSlotId));
nextSlotId++;
}
promiseList.clear();
}
private synchronized void cancelPromises(Throwable reason) {
- for (Promise<DLSN> promise : promiseList) {
- promise.setException(reason);
+ for (CompletableFuture<DLSN> promise : promiseList) {
+ promise.completeExceptionally(reason);
}
promiseList.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
index 375ed3f..55b20ff 100644
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
+++ b/distributedlog-protocol/src/main/java/org/apache/distributedlog/LogRecordSet.java
@@ -19,15 +19,15 @@ package org.apache.distributedlog;
import static com.google.common.base.Preconditions.checkArgument;
-import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.exceptions.WriteException;
-import org.apache.distributedlog.io.CompressionCodec;
-import com.twitter.util.Promise;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
+import org.apache.distributedlog.exceptions.LogRecordTooLongException;
+import org.apache.distributedlog.exceptions.WriteException;
+import org.apache.distributedlog.io.CompressionCodec;
/**
* A set of {@link LogRecord}s.
@@ -134,7 +134,7 @@ public class LogRecordSet {
* @throws LogRecordTooLongException if the record is too long
* @throws WriteException when encountered exception writing the record
*/
- void writeRecord(ByteBuffer record, Promise<DLSN> transmitPromise)
+ void writeRecord(ByteBuffer record, CompletableFuture<DLSN> transmitPromise)
throws LogRecordTooLongException, WriteException;
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java
deleted file mode 100644
index f8cdea4..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/DistributedLogAnnotations.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.annotations;
-
-/**
- * Common annotation types.
- */
-public class DistributedLogAnnotations {
- /**
- * Annotation to identify flaky tests in DistributedLog.
- * As and when we find that a test is flaky, we'll add this annotation to it for reference.
- */
- public @interface FlakyTest {}
-
- /**
- * Annotation to specify the occurrence of a compression operation. These are CPU intensive
- * and should be avoided in low-latency paths.
- */
- public @interface Compression {}
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java
deleted file mode 100644
index 0922f14..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/annotations/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Defines annotations used across distributedlog project.
- */
-package org.apache.distributedlog.annotations;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java
deleted file mode 100644
index b5280c9..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/BitMaskUtils.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.util;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-/**
- * Utils for bit mask operations.
- */
-public class BitMaskUtils {
-
- /**
- * 1) Unset all bits where value in mask is set.
- * 2) Set these bits to value specified by newValue.
- *
- * <p>e.g.
- * if oldValue = 1010, mask = 0011, newValue = 0001
- * 1) 1010 -> 1000
- * 2) 1000 -> 1001
- *
- * @param oldValue expected old value
- * @param mask the mask of the value for updates
- * @param newValue new value to set
- * @return updated value
- */
- public static long set(long oldValue, long mask, long newValue) {
- checkArgument(oldValue >= 0L && mask >= 0L && newValue >= 0L);
- return ((oldValue & (~mask)) | (newValue & mask));
- }
-
- /**
- * Get the bits where mask is 1.
- *
- * @param value value
- * @param mask mask of the value
- * @return the bit of the mask
- */
- public static long get(long value, long mask) {
- checkArgument(value >= 0L && mask >= 0L);
- return (value & mask);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java b/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java
deleted file mode 100644
index ee17950..0000000
--- a/distributedlog-protocol/src/main/java/org/apache/distributedlog/util/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * defines the utilities used across the project.
- */
-package org.apache.distributedlog.util;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
index 95e03ab..1c5db24 100644
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
+++ b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestLogRecordSet.java
@@ -25,15 +25,14 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;
import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.LogRecordSet.Reader;
import org.apache.distributedlog.LogRecordSet.Writer;
import org.apache.distributedlog.exceptions.LogRecordTooLongException;
import org.apache.distributedlog.io.CompressionCodec.Type;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import java.nio.ByteBuffer;
-import java.util.List;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.junit.Test;
/**
@@ -72,7 +71,7 @@ public class TestLogRecordSet {
ByteBuffer dataBuf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
try {
- writer.writeRecord(dataBuf, new Promise<DLSN>());
+ writer.writeRecord(dataBuf, new CompletableFuture<DLSN>());
fail("Should fail on writing large record");
} catch (LogRecordTooLongException lrtle) {
// expected
@@ -111,18 +110,18 @@ public class TestLogRecordSet {
assertEquals("zero user bytes", HEADER_LEN, writer.getNumBytes());
assertEquals("zero records", 0, writer.getNumRecords());
- List<Future<DLSN>> writePromiseList = Lists.newArrayList();
+ List<CompletableFuture<DLSN>> writePromiseList = Lists.newArrayList();
/// write first 5 records
for (int i = 0; i < 5; i++) {
ByteBuffer record = ByteBuffer.wrap(("record-" + i).getBytes(UTF_8));
- Promise<DLSN> writePromise = new Promise<DLSN>();
+ CompletableFuture<DLSN> writePromise = new CompletableFuture<>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 1) + " records", (i + 1), writer.getNumRecords());
}
ByteBuffer dataBuf = ByteBuffer.allocate(MAX_LOGRECORD_SIZE + 1);
try {
- writer.writeRecord(dataBuf, new Promise<DLSN>());
+ writer.writeRecord(dataBuf, new CompletableFuture<>());
fail("Should fail on writing large record");
} catch (LogRecordTooLongException lrtle) {
// expected
@@ -132,7 +131,7 @@ public class TestLogRecordSet {
/// write another 5 records
for (int i = 0; i < 5; i++) {
ByteBuffer record = ByteBuffer.wrap(("record-" + (i + 5)).getBytes(UTF_8));
- Promise<DLSN> writePromise = new Promise<DLSN>();
+ CompletableFuture<DLSN> writePromise = new CompletableFuture<>();
writer.writeRecord(record, writePromise);
writePromiseList.add(writePromise);
assertEquals((i + 6) + " records", (i + 6), writer.getNumRecords());
@@ -143,7 +142,7 @@ public class TestLogRecordSet {
// Test transmit complete
writer.completeTransmit(1L, 1L, 10L);
- List<DLSN> writeResults = Await.result(Future.collect(writePromiseList));
+ List<DLSN> writeResults = FutureUtils.result(FutureUtils.collect(writePromiseList));
for (int i = 0; i < 10; i++) {
assertEquals(new DLSN(1L, 1L, 10L + i), writeResults.get(i));
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/53fca4ac/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java b/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java
deleted file mode 100644
index 20cf53c..0000000
--- a/distributedlog-protocol/src/test/java/org/apache/distributedlog/TestTimedOutTestsListener.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.notification.Failure;
-
-/**
- * Test Case for {@link TimedOutTestsListener}.
- */
-public class TestTimedOutTestsListener {
-
- private static class Deadlock {
- private CyclicBarrier barrier = new CyclicBarrier(6);
-
- public Deadlock() {
- DeadlockThread[] dThreads = new DeadlockThread[6];
-
- Monitor a = new Monitor("a");
- Monitor b = new Monitor("b");
- Monitor c = new Monitor("c");
- dThreads[0] = new DeadlockThread("MThread-1", a, b);
- dThreads[1] = new DeadlockThread("MThread-2", b, c);
- dThreads[2] = new DeadlockThread("MThread-3", c, a);
-
- Lock d = new ReentrantLock();
- Lock e = new ReentrantLock();
- Lock f = new ReentrantLock();
-
- dThreads[3] = new DeadlockThread("SThread-4", d, e);
- dThreads[4] = new DeadlockThread("SThread-5", e, f);
- dThreads[5] = new DeadlockThread("SThread-6", f, d);
-
- // make them daemon threads so that the test will exit
- for (int i = 0; i < 6; i++) {
- dThreads[i].setDaemon(true);
- dThreads[i].start();
- }
- }
-
- class DeadlockThread extends Thread {
- private Lock lock1 = null;
-
- private Lock lock2 = null;
-
- private Monitor mon1 = null;
-
- private Monitor mon2 = null;
-
- private boolean useSync;
-
- DeadlockThread(String name, Lock lock1, Lock lock2) {
- super(name);
- this.lock1 = lock1;
- this.lock2 = lock2;
- this.useSync = true;
- }
-
- DeadlockThread(String name, Monitor mon1, Monitor mon2) {
- super(name);
- this.mon1 = mon1;
- this.mon2 = mon2;
- this.useSync = false;
- }
-
- public void run() {
- if (useSync) {
- syncLock();
- } else {
- monitorLock();
- }
- }
-
- private void syncLock() {
- lock1.lock();
- try {
- try {
- barrier.await();
- } catch (Exception e) {
- }
- goSyncDeadlock();
- } finally {
- lock1.unlock();
- }
- }
-
- private void goSyncDeadlock() {
- try {
- barrier.await();
- } catch (Exception e) {
- }
- lock2.lock();
- throw new RuntimeException("should not reach here.");
- }
-
- private void monitorLock() {
- synchronized (mon1) {
- try {
- barrier.await();
- } catch (Exception e) {
- }
- goMonitorDeadlock();
- }
- }
-
- private void goMonitorDeadlock() {
- try {
- barrier.await();
- } catch (Exception e) {
- }
- synchronized (mon2) {
- throw new RuntimeException(getName() + " should not reach here.");
- }
- }
- }
-
- class Monitor {
- String name;
-
- Monitor(String name) {
- this.name = name;
- }
- }
-
- }
-
- @Test(timeout = 500)
- public void testThreadDumpAndDeadlocks() throws Exception {
- new Deadlock();
- String s = null;
- while (true) {
- s = TimedOutTestsListener.buildDeadlockInfo();
- if (s != null) {
- break;
- }
- Thread.sleep(100);
- }
-
- Assert.assertEquals(3, countStringOccurrences(s, "BLOCKED"));
-
- Failure failure = new Failure(null, new Exception(TimedOutTestsListener.TEST_TIMED_OUT_PREFIX));
- StringWriter writer = new StringWriter();
- new TimedOutTestsListener(new PrintWriter(writer)).testFailure(failure);
- String out = writer.toString();
-
- Assert.assertTrue(out.contains("THREAD DUMP"));
- Assert.assertTrue(out.contains("DEADLOCKS DETECTED"));
-
- System.out.println(out);
- }
-
- private int countStringOccurrences(String s, String substr) {
- int n = 0;
- int index = 0;
- while ((index = s.indexOf(substr, index) + 1) != 0) {
- n++;
- }
- return n;
- }
-
-}