You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:17 UTC
[03/31] incubator-distributedlog git commit: DL-117: Stream metadata
store
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index d874274..2a8c83b 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -27,6 +27,8 @@ import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.ZooKeeperClientUtils;
import com.twitter.distributedlog.callback.LogSegmentNamesListener;
import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
import com.twitter.distributedlog.util.DLUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
@@ -57,6 +59,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
/**
* Test ZK based log segment metadata store.
@@ -133,14 +136,14 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
public void testCreateLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
LogSegmentMetadata segment2 = createLogSegment(1L);
Transaction<Object> createTxn2 = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn2, segment2);
+ lsmStore.createLogSegment(createTxn2, segment2, null);
try {
FutureUtils.result(createTxn2.execute());
fail("Should fail if log segment exists");
@@ -158,13 +161,13 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
public void testDeleteLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
zkc.get().exists(segment.getZkPath(), false));
Transaction<Object> deleteTxn = lsmStore.transaction();
- lsmStore.deleteLogSegment(deleteTxn, segment);
+ lsmStore.deleteLogSegment(deleteTxn, segment, null);
FutureUtils.result(deleteTxn.execute());
assertNull("LogSegment " + segment + " should be deleted",
zkc.get().exists(segment.getZkPath(), false));
@@ -174,7 +177,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
public void testDeleteNonExistentLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L);
Transaction<Object> deleteTxn = lsmStore.transaction();
- lsmStore.deleteLogSegment(deleteTxn, segment);
+ lsmStore.deleteLogSegment(deleteTxn, segment, null);
try {
FutureUtils.result(deleteTxn.execute());
fail("Should fail deletion if log segment doesn't exist");
@@ -208,7 +211,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
public void testUpdateLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L, 99L);
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
@@ -230,15 +233,15 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
LogSegmentMetadata segment2 = createLogSegment(2L);
// create log segment 1
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment1);
+ lsmStore.createLogSegment(createTxn, segment1, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment1 + " should be created",
zkc.get().exists(segment1.getZkPath(), false));
// delete log segment 1 and create log segment 2
Transaction<Object> createDeleteTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createDeleteTxn, segment2);
- lsmStore.deleteLogSegment(createDeleteTxn, segment1);
+ lsmStore.createLogSegment(createDeleteTxn, segment2, null);
+ lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
FutureUtils.result(createDeleteTxn.execute());
// segment 1 should be deleted, segment 2 should be created
assertNull("LogSegment " + segment1 + " should be deleted",
@@ -254,16 +257,16 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
LogSegmentMetadata segment3 = createLogSegment(3L);
// create log segment 1
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment1);
+ lsmStore.createLogSegment(createTxn, segment1, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment1 + " should be created",
zkc.get().exists(segment1.getZkPath(), false));
// delete log segment 1 and delete log segment 2
Transaction<Object> createDeleteTxn = lsmStore.transaction();
- lsmStore.deleteLogSegment(createDeleteTxn, segment1);
- lsmStore.deleteLogSegment(createDeleteTxn, segment2);
- lsmStore.createLogSegment(createDeleteTxn, segment3);
+ lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
+ lsmStore.deleteLogSegment(createDeleteTxn, segment2, null);
+ lsmStore.createLogSegment(createDeleteTxn, segment3, null);
try {
FutureUtils.result(createDeleteTxn.execute());
fail("Should fail transaction if one operation failed");
@@ -286,7 +289,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
public void testGetLogSegment() throws Exception {
LogSegmentMetadata segment = createLogSegment(1L, 99L);
Transaction<Object> createTxn = lsmStore.transaction();
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
FutureUtils.result(createTxn.execute());
// the log segment should be created
assertNotNull("LogSegment " + segment + " should be created",
@@ -304,7 +307,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
for (int i = 0; i < 10; i++) {
LogSegmentMetadata segment = createLogSegment(i);
createdSegments.add(segment);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -353,7 +356,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -394,7 +397,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> anotherCreateTxn = lsmStore.transaction();
for (int i = numSegments; i < 2 * numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(anotherCreateTxn, segment);
+ lsmStore.createLogSegment(anotherCreateTxn, segment, null);
}
FutureUtils.result(anotherCreateTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -419,7 +422,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -459,7 +462,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> deleteTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.deleteLogSegment(deleteTxn, segment);
+ lsmStore.deleteLogSegment(deleteTxn, segment, null);
}
FutureUtils.result(deleteTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -491,7 +494,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -536,7 +539,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> anotherCreateTxn = lsmStore.transaction();
for (int i = numSegments; i < 2 * numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(anotherCreateTxn, segment);
+ lsmStore.createLogSegment(anotherCreateTxn, segment, null);
}
FutureUtils.result(anotherCreateTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -561,7 +564,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> createTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.createLogSegment(createTxn, segment);
+ lsmStore.createLogSegment(createTxn, segment, null);
}
FutureUtils.result(createTxn.execute());
String rootPath = "/" + runtime.getMethodName();
@@ -602,7 +605,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> deleteTxn = lsmStore.transaction();
for (int i = 0; i < numSegments; i++) {
LogSegmentMetadata segment = createLogSegment(i);
- lsmStore.deleteLogSegment(deleteTxn, segment);
+ lsmStore.deleteLogSegment(deleteTxn, segment, null);
}
FutureUtils.result(deleteTxn.execute());
List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -634,7 +637,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
final Promise<Version> result = new Promise<Version>();
- lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value,
+ ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+ when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
+ lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -659,7 +664,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
- lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value,
+ ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+ when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
+ lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -695,7 +702,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
String nonExistentPath = rootZkPath + "/non-existent";
- lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value,
+ ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+ when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath);
+ lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -726,7 +735,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
final Promise<Version> result = new Promise<Version>();
- lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
+ ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+ when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
+ lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -751,7 +762,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Transaction<Object> updateTxn = lsmStore.transaction();
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
- lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
+ ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+ when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
+ lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
@@ -787,7 +800,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
final Promise<Version> result = new Promise<Version>();
String nonExistentPath = rootZkPath + "/non-existent";
- lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value,
+ ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+ when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath);
+ lsmStore.storeMaxTxnId(updateTxn, metadata, value,
new Transaction.OpListener<Version>() {
@Override
public void onCommit(Version r) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
deleted file mode 100644
index 648b828..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.twitter.distributedlog.TestZooKeeperClientBuilder;
-import com.twitter.distributedlog.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.DLMetadata;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DLMTestUtil;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClusterTestCase;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.ZooDefs;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.List;
-
-import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
-import static org.junit.Assert.*;
-
-/**
- * Test {@link ZKLogMetadataForWriter}
- */
-public class TestZKLogMetadataForWriter extends ZooKeeperClusterTestCase {
-
- private static final Logger logger = LoggerFactory.getLogger(TestZKLogMetadataForWriter.class);
-
- private final static int sessionTimeoutMs = 30000;
-
- @Rule
- public TestName testName = new TestName();
-
- private ZooKeeperClient zkc;
- private URI uri;
-
- private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier)
- throws Exception {
- final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
- final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
- final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
- final String lockPath = logRootPath + LOCK_PATH;
- final String readLockPath = logRootPath + READ_LOCK_PATH;
- final String versionPath = logRootPath + VERSION_PATH;
- final String allocationPath = logRootPath + ALLOCATION_PATH;
-
- Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
- zk.getDefaultACL(), CreateMode.PERSISTENT);
- Transaction txn = zk.get().transaction();
- txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
- DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
- zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
- zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES,
- zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
- zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(versionPath, ZKLogMetadataForWriter.intToBytes(LAYOUT_VERSION),
- zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
- zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.commit();
- }
-
- @Before
- public void setup() throws Exception {
- zkc = TestZooKeeperClientBuilder.newBuilder()
- .name("zkc")
- .uri(DLMTestUtil.createDLMURI(zkPort, "/"))
- .sessionTimeoutMs(sessionTimeoutMs)
- .build();
- uri = DLMTestUtil.createDLMURI(zkPort, "");
- try {
- ZkUtils.createFullPathOptimistic(
- zkc.get(),
- uri.getPath(),
- new byte[0],
- ZooDefs.Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nee) {
- logger.debug("The namespace uri already exists.");
- }
- }
-
- @After
- public void teardown() throws Exception {
- zkc.close();
- }
-
- @Test(timeout = 60000)
- public void testCheckLogMetadataPathsWithAllocator() throws Exception {
- String logRootPath = "/" + testName.getMethodName();
- List<Versioned<byte[]>> metadatas =
- FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(
- zkc.get(), logRootPath, true));
- assertEquals("Should have 8 paths",
- 8, metadatas.size());
- for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
- assertNull(path.getValue());
- assertNull(path.getVersion());
- }
- }
-
- @Test(timeout = 60000)
- public void testCheckLogMetadataPathsWithoutAllocator() throws Exception {
- String logRootPath = "/" + testName.getMethodName();
- List<Versioned<byte[]>> metadatas =
- FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(
- zkc.get(), logRootPath, false));
- assertEquals("Should have 7 paths",
- 7, metadatas.size());
- for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
- assertNull(path.getValue());
- assertNull(path.getVersion());
- }
- }
-
- private void testCreateLogMetadataWithMissingPaths(URI uri,
- String logName,
- String logIdentifier,
- List<String> pathsToDelete,
- boolean ownAllocator,
- boolean createLogFirst)
- throws Exception {
- if (createLogFirst) {
- createLog(zkc, uri, logName, logIdentifier);
- }
- // delete a path
- for (String path : pathsToDelete) {
- zkc.get().delete(path, -1);
- }
-
- ZKLogMetadataForWriter logMetadata =
- FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier,
- zkc.get(), zkc.getDefaultACL(), ownAllocator, true));
-
- final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-
- List<Versioned<byte[]>> metadatas =
- FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator));
-
- if (ownAllocator) {
- assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator,
- 8, metadatas.size());
- } else {
- assertEquals("Should have 7 paths : ownAllocator = " + ownAllocator,
- 7, metadatas.size());
- }
-
- for (Versioned<byte[]> metadata : metadatas) {
- assertTrue(ZKLogMetadataForWriter.pathExists(metadata));
- assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0);
- }
-
- Versioned<byte[]> logSegmentsData = logMetadata.getMaxLSSNData();
-
- assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO,
- DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue()));
-
- Versioned<byte[]> maxTxIdData = logMetadata.getMaxTxIdData();
-
- assertEquals(0L, DLUtils.deserializeTransactionId(maxTxIdData.getValue()));
-
- if (ownAllocator) {
- Versioned<byte[]> allocationData = logMetadata.getAllocationData();
- assertEquals(0, allocationData.getValue().length);
- }
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataMissingLogSegmentsPath() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- String logRootPath = getLogRootPath(uri, logName, logIdentifier);
- List<String> pathsToDelete = Lists.newArrayList(
- logRootPath + LOGSEGMENTS_PATH);
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataMissingMaxTxIdPath() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- String logRootPath = getLogRootPath(uri, logName, logIdentifier);
- List<String> pathsToDelete = Lists.newArrayList(
- logRootPath + MAX_TXID_PATH);
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataMissingLockPath() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- String logRootPath = getLogRootPath(uri, logName, logIdentifier);
- List<String> pathsToDelete = Lists.newArrayList(
- logRootPath + LOCK_PATH);
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataMissingReadLockPath() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- String logRootPath = getLogRootPath(uri, logName, logIdentifier);
- List<String> pathsToDelete = Lists.newArrayList(
- logRootPath + READ_LOCK_PATH);
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataMissingVersionPath() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- String logRootPath = getLogRootPath(uri, logName, logIdentifier);
- List<String> pathsToDelete = Lists.newArrayList(
- logRootPath + VERSION_PATH);
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataMissingAllocatorPath() throws Exception {
- URI uri = DLMTestUtil.createDLMURI(zkPort, "");
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- String logRootPath = getLogRootPath(uri, logName, logIdentifier);
- List<String> pathsToDelete = Lists.newArrayList(
- logRootPath + ALLOCATION_PATH);
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataMissingAllPath() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- String logRootPath = getLogRootPath(uri, logName, logIdentifier);
- List<String> pathsToDelete = Lists.newArrayList(
- logRootPath + LOGSEGMENTS_PATH,
- logRootPath + MAX_TXID_PATH,
- logRootPath + LOCK_PATH,
- logRootPath + READ_LOCK_PATH,
- logRootPath + VERSION_PATH,
- logRootPath + ALLOCATION_PATH);
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataOnExistedLog() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- List<String> pathsToDelete = Lists.newArrayList();
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadata() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- List<String> pathsToDelete = Lists.newArrayList();
-
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
- }
-
- @Test(timeout = 60000, expected = LogNotFoundException.class)
- public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier,
- zkc.get(), zkc.getDefaultACL(), true, false));
- }
-
- @Test(timeout = 60000)
- public void testCreateLogMetadataWithCustomMetadata() throws Exception {
- String logName = testName.getMethodName();
- String logIdentifier = "<default>";
- List<String> pathsToDelete = Lists.newArrayList();
-
- DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(uri);
-
- DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
- .conf(new DistributedLogConfiguration())
- .uri(uri)
- .build();
-
- DistributedLogManager dlm = namespace.openLog(logName);
- dlm.createOrUpdateMetadata(logName.getBytes("UTF-8"));
- dlm.close();
-
- testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
deleted file mode 100644
index ce67c30..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DLMTestUtil;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.util.DLUtils;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-public class TestZKLogMetadataForWriterUtilFunctions {
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000, expected = UnexpectedException.class)
- public void testProcessLogMetadatasMissingMaxTxnId() throws Exception {
- String rootPath = "/test-missing-max-txn-id";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000, expected = UnexpectedException.class)
- public void testProcessLogMetadatasMissingVersion() throws Exception {
- String rootPath = "/test-missing-version";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000, expected = UnexpectedException.class)
- public void testProcessLogMetadatasWrongVersion() throws Exception {
- String rootPath = "/test-missing-version";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(9999), null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000, expected = UnexpectedException.class)
- public void testProcessLogMetadatasMissingLockPath() throws Exception {
- String rootPath = "/test-missing-version";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
- new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000, expected = UnexpectedException.class)
- public void testProcessLogMetadatasMissingReadLockPath() throws Exception {
- String rootPath = "/test-missing-version";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000, expected = UnexpectedException.class)
- public void testProcessLogMetadatasMissingLogSegmentsPath() throws Exception {
- String rootPath = "/test-missing-version";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000, expected = UnexpectedException.class)
- public void testProcessLogMetadatasMissingAllocatorPath() throws Exception {
- String rootPath = "/test-missing-version";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)),
- new Versioned<byte[]>(null, null));
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000)
- public void testProcessLogMetadatasNoAllocatorPath() throws Exception {
- String rootPath = "/test-missing-version";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- Versioned<byte[]> maxTxnIdData =
- new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
- Versioned<byte[]> logSegmentsData =
- new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- maxTxnIdData,
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- logSegmentsData);
- ZKLogMetadataForWriter metadata =
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
- assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
- assertTrue(logSegmentsData == metadata.getMaxLSSNData());
- assertNull(metadata.getAllocationData().getValue());
- assertNull(metadata.getAllocationData().getVersion());
- }
-
- @SuppressWarnings("unchecked")
- @Test(timeout = 60000)
- public void testProcessLogMetadatasAllocatorPath() throws Exception {
- String rootPath = "/test-missing-version";
- URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
- String logName = "test-log";
- String logIdentifier = "<default>";
- Versioned<byte[]> maxTxnIdData =
- new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
- Versioned<byte[]> logSegmentsData =
- new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
- Versioned<byte[]> allocationData =
- new Versioned<byte[]>(DLUtils.ledgerId2Bytes(1L), new ZkVersion(1));
- List<Versioned<byte[]>> metadatas = Lists.newArrayList(
- new Versioned<byte[]>(null, null),
- new Versioned<byte[]>(null, null),
- maxTxnIdData,
- new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
- logSegmentsData,
- allocationData);
- ZKLogMetadataForWriter metadata =
- ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
- assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
- assertTrue(logSegmentsData == metadata.getMaxLSSNData());
- assertTrue(allocationData == metadata.getAllocationData());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
new file mode 100644
index 0000000..9a08aa0
--- /dev/null
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.twitter.distributedlog.TestZooKeeperClientBuilder;
+import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.DLMetadata;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DLMTestUtil;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.ZooKeeperClusterTestCase;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.List;
+
+import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
+import static org.junit.Assert.*;
+
+/**
+ * Test {@link ZKLogStreamMetadataStore}
+ */
+public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestZKLogStreamMetadataStore.class);
+
+ private final static int sessionTimeoutMs = 30000;
+
+ @Rule
+ public TestName testName = new TestName();
+
+ private ZooKeeperClient zkc;
+ private URI uri;
+
+ private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier)
+ throws Exception {
+ final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+ final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
+ final String lockPath = logRootPath + LOCK_PATH;
+ final String readLockPath = logRootPath + READ_LOCK_PATH;
+ final String versionPath = logRootPath + VERSION_PATH;
+ final String allocationPath = logRootPath + ALLOCATION_PATH;
+
+ Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ Transaction txn = zk.get().transaction();
+ txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
+ DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES,
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(versionPath, intToBytes(LAYOUT_VERSION),
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.commit();
+ }
+
+ @Before
+ public void setup() throws Exception {
+ zkc = TestZooKeeperClientBuilder.newBuilder()
+ .name("zkc")
+ .uri(DLMTestUtil.createDLMURI(zkPort, "/"))
+ .sessionTimeoutMs(sessionTimeoutMs)
+ .build();
+ uri = DLMTestUtil.createDLMURI(zkPort, "");
+ try {
+ ZkUtils.createFullPathOptimistic(
+ zkc.get(),
+ uri.getPath(),
+ new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException nee) {
+ logger.debug("The namespace uri already exists.");
+ }
+ }
+
+ @After
+ public void teardown() throws Exception {
+ zkc.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testCheckLogMetadataPathsWithAllocator() throws Exception {
+ String logRootPath = "/" + testName.getMethodName();
+ List<Versioned<byte[]>> metadatas =
+ FutureUtils.result(checkLogMetadataPaths(
+ zkc.get(), logRootPath, true));
+ assertEquals("Should have 8 paths",
+ 8, metadatas.size());
+ for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
+ assertNull(path.getValue());
+ assertNull(path.getVersion());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testCheckLogMetadataPathsWithoutAllocator() throws Exception {
+ String logRootPath = "/" + testName.getMethodName();
+ List<Versioned<byte[]>> metadatas =
+ FutureUtils.result(checkLogMetadataPaths(
+ zkc.get(), logRootPath, false));
+ assertEquals("Should have 7 paths",
+ 7, metadatas.size());
+ for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
+ assertNull(path.getValue());
+ assertNull(path.getVersion());
+ }
+ }
+
+ private void testCreateLogMetadataWithMissingPaths(URI uri,
+ String logName,
+ String logIdentifier,
+ List<String> pathsToDelete,
+ boolean ownAllocator,
+ boolean createLogFirst)
+ throws Exception {
+ if (createLogFirst) {
+ createLog(zkc, uri, logName, logIdentifier);
+ }
+ // delete a path
+ for (String path : pathsToDelete) {
+ zkc.get().delete(path, -1);
+ }
+
+ ZKLogMetadataForWriter logMetadata =
+ FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, ownAllocator, true));
+
+ final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+
+ List<Versioned<byte[]>> metadatas =
+ FutureUtils.result(checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator));
+
+ if (ownAllocator) {
+ assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator,
+ 8, metadatas.size());
+ } else {
+ assertEquals("Should have 7 paths : ownAllocator = " + ownAllocator,
+ 7, metadatas.size());
+ }
+
+ for (Versioned<byte[]> metadata : metadatas) {
+ assertTrue(pathExists(metadata));
+ assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0);
+ }
+
+ Versioned<byte[]> logSegmentsData = logMetadata.getMaxLSSNData();
+
+ assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO,
+ DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue()));
+
+ Versioned<byte[]> maxTxIdData = logMetadata.getMaxTxIdData();
+
+ assertEquals(0L, DLUtils.deserializeTransactionId(maxTxIdData.getValue()));
+
+ if (ownAllocator) {
+ Versioned<byte[]> allocationData = logMetadata.getAllocationData();
+ assertEquals(0, allocationData.getValue().length);
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataMissingLogSegmentsPath() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ List<String> pathsToDelete = Lists.newArrayList(
+ logRootPath + LOGSEGMENTS_PATH);
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataMissingMaxTxIdPath() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ List<String> pathsToDelete = Lists.newArrayList(
+ logRootPath + MAX_TXID_PATH);
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataMissingLockPath() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ List<String> pathsToDelete = Lists.newArrayList(
+ logRootPath + LOCK_PATH);
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataMissingReadLockPath() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ List<String> pathsToDelete = Lists.newArrayList(
+ logRootPath + READ_LOCK_PATH);
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataMissingVersionPath() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ List<String> pathsToDelete = Lists.newArrayList(
+ logRootPath + VERSION_PATH);
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataMissingAllocatorPath() throws Exception {
+ URI uri = DLMTestUtil.createDLMURI(zkPort, "");
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ List<String> pathsToDelete = Lists.newArrayList(
+ logRootPath + ALLOCATION_PATH);
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataMissingAllPath() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ List<String> pathsToDelete = Lists.newArrayList(
+ logRootPath + LOGSEGMENTS_PATH,
+ logRootPath + MAX_TXID_PATH,
+ logRootPath + LOCK_PATH,
+ logRootPath + READ_LOCK_PATH,
+ logRootPath + VERSION_PATH,
+ logRootPath + ALLOCATION_PATH);
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataOnExistedLog() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ List<String> pathsToDelete = Lists.newArrayList();
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadata() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ List<String> pathsToDelete = Lists.newArrayList();
+
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
+ }
+
+ @Test(timeout = 60000, expected = LogNotFoundException.class)
+ public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, true, false));
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateLogMetadataWithCustomMetadata() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ List<String> pathsToDelete = Lists.newArrayList();
+
+ DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(uri);
+
+ DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+ .conf(new DistributedLogConfiguration())
+ .uri(uri)
+ .build();
+
+ DistributedLogManager dlm = namespace.openLog(logName);
+ dlm.createOrUpdateMetadata(logName.getBytes("UTF-8"));
+ dlm.close();
+
+ testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
new file mode 100644
index 0000000..f14a217
--- /dev/null
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DLMTestUtil;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.util.DLUtils;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.List;
+
+import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
+import static org.junit.Assert.*;
+
+public class TestZKLogStreamMetadataStoreUtils {
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000, expected = UnexpectedException.class)
+ public void testProcessLogMetadatasMissingMaxTxnId() throws Exception {
+ String rootPath = "/test-missing-max-txn-id";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null));
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000, expected = UnexpectedException.class)
+ public void testProcessLogMetadatasMissingVersion() throws Exception {
+ String rootPath = "/test-missing-version";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+ new Versioned<byte[]>(null, null));
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000, expected = UnexpectedException.class)
+ public void testProcessLogMetadatasWrongVersion() throws Exception {
+ String rootPath = "/test-missing-version";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+ new Versioned<byte[]>(intToBytes(9999), null));
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000, expected = UnexpectedException.class)
+ public void testProcessLogMetadatasMissingLockPath() throws Exception {
+ String rootPath = "/test-missing-version";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(null, null));
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000, expected = UnexpectedException.class)
+ public void testProcessLogMetadatasMissingReadLockPath() throws Exception {
+ String rootPath = "/test-missing-version";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ new Versioned<byte[]>(null, null));
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000, expected = UnexpectedException.class)
+ public void testProcessLogMetadatasMissingLogSegmentsPath() throws Exception {
+ String rootPath = "/test-missing-version";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ new Versioned<byte[]>(null, null));
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000, expected = UnexpectedException.class)
+ public void testProcessLogMetadatasMissingAllocatorPath() throws Exception {
+ String rootPath = "/test-missing-version";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)),
+ new Versioned<byte[]>(null, null));
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000)
+ public void testProcessLogMetadatasNoAllocatorPath() throws Exception {
+ String rootPath = "/test-missing-version";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ Versioned<byte[]> maxTxnIdData =
+ new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
+ Versioned<byte[]> logSegmentsData =
+ new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ maxTxnIdData,
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ logSegmentsData);
+ ZKLogMetadataForWriter metadata =
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+ assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
+ assertTrue(logSegmentsData == metadata.getMaxLSSNData());
+ assertNull(metadata.getAllocationData().getValue());
+ assertNull(metadata.getAllocationData().getVersion());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 60000)
+ public void testProcessLogMetadatasAllocatorPath() throws Exception {
+ String rootPath = "/test-missing-version";
+ URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+ String logName = "test-log";
+ String logIdentifier = "<default>";
+ Versioned<byte[]> maxTxnIdData =
+ new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
+ Versioned<byte[]> logSegmentsData =
+ new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
+ Versioned<byte[]> allocationData =
+ new Versioned<byte[]>(DLUtils.ledgerId2Bytes(1L), new ZkVersion(1));
+ List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+ new Versioned<byte[]>(null, null),
+ new Versioned<byte[]>(null, null),
+ maxTxnIdData,
+ new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+ logSegmentsData,
+ allocationData);
+ ZKLogMetadataForWriter metadata =
+ processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
+ assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
+ assertTrue(logSegmentsData == metadata.getMaxLSSNData());
+ assertTrue(allocationData == metadata.getAllocationData());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
index 8899c0e..db87a65 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
@@ -17,6 +17,7 @@
*/
package com.twitter.distributedlog.util;
+import com.twitter.distributedlog.zk.LimitedPermitManager;
import org.junit.Test;
import java.util.ArrayList;