You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/30 00:07:17 UTC

[03/31] incubator-distributedlog git commit: DL-117: Stream metadata store

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index d874274..2a8c83b 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -27,6 +27,8 @@ import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.ZooKeeperClientUtils;
 import com.twitter.distributedlog.callback.LogSegmentNamesListener;
 import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadata;
+import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForWriter;
 import com.twitter.distributedlog.util.DLUtils;
 import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
@@ -57,6 +59,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
 
 /**
  * Test ZK based log segment metadata store.
@@ -133,14 +136,14 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
     public void testCreateLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L);
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment);
+        lsmStore.createLogSegment(createTxn, segment, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
                 zkc.get().exists(segment.getZkPath(), false));
         LogSegmentMetadata segment2 = createLogSegment(1L);
         Transaction<Object> createTxn2 = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn2, segment2);
+        lsmStore.createLogSegment(createTxn2, segment2, null);
         try {
             FutureUtils.result(createTxn2.execute());
             fail("Should fail if log segment exists");
@@ -158,13 +161,13 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
     public void testDeleteLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L);
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment);
+        lsmStore.createLogSegment(createTxn, segment, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
                 zkc.get().exists(segment.getZkPath(), false));
         Transaction<Object> deleteTxn = lsmStore.transaction();
-        lsmStore.deleteLogSegment(deleteTxn, segment);
+        lsmStore.deleteLogSegment(deleteTxn, segment, null);
         FutureUtils.result(deleteTxn.execute());
         assertNull("LogSegment " + segment + " should be deleted",
                 zkc.get().exists(segment.getZkPath(), false));
@@ -174,7 +177,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
     public void testDeleteNonExistentLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L);
         Transaction<Object> deleteTxn = lsmStore.transaction();
-        lsmStore.deleteLogSegment(deleteTxn, segment);
+        lsmStore.deleteLogSegment(deleteTxn, segment, null);
         try {
             FutureUtils.result(deleteTxn.execute());
             fail("Should fail deletion if log segment doesn't exist");
@@ -208,7 +211,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
     public void testUpdateLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L, 99L);
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment);
+        lsmStore.createLogSegment(createTxn, segment, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
@@ -230,15 +233,15 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         LogSegmentMetadata segment2 = createLogSegment(2L);
         // create log segment 1
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment1);
+        lsmStore.createLogSegment(createTxn, segment1, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment1 + " should be created",
                 zkc.get().exists(segment1.getZkPath(), false));
         // delete log segment 1 and create log segment 2
         Transaction<Object> createDeleteTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createDeleteTxn, segment2);
-        lsmStore.deleteLogSegment(createDeleteTxn, segment1);
+        lsmStore.createLogSegment(createDeleteTxn, segment2, null);
+        lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
         FutureUtils.result(createDeleteTxn.execute());
         // segment 1 should be deleted, segment 2 should be created
         assertNull("LogSegment " + segment1 + " should be deleted",
@@ -254,16 +257,16 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         LogSegmentMetadata segment3 = createLogSegment(3L);
         // create log segment 1
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment1);
+        lsmStore.createLogSegment(createTxn, segment1, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment1 + " should be created",
                 zkc.get().exists(segment1.getZkPath(), false));
         // delete log segment 1 and delete log segment 2
         Transaction<Object> createDeleteTxn = lsmStore.transaction();
-        lsmStore.deleteLogSegment(createDeleteTxn, segment1);
-        lsmStore.deleteLogSegment(createDeleteTxn, segment2);
-        lsmStore.createLogSegment(createDeleteTxn, segment3);
+        lsmStore.deleteLogSegment(createDeleteTxn, segment1, null);
+        lsmStore.deleteLogSegment(createDeleteTxn, segment2, null);
+        lsmStore.createLogSegment(createDeleteTxn, segment3, null);
         try {
             FutureUtils.result(createDeleteTxn.execute());
             fail("Should fail transaction if one operation failed");
@@ -286,7 +289,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
     public void testGetLogSegment() throws Exception {
         LogSegmentMetadata segment = createLogSegment(1L, 99L);
         Transaction<Object> createTxn = lsmStore.transaction();
-        lsmStore.createLogSegment(createTxn, segment);
+        lsmStore.createLogSegment(createTxn, segment, null);
         FutureUtils.result(createTxn.execute());
         // the log segment should be created
         assertNotNull("LogSegment " + segment + " should be created",
@@ -304,7 +307,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         for (int i = 0; i < 10; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
             createdSegments.add(segment);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -353,7 +356,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> createTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -394,7 +397,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> anotherCreateTxn = lsmStore.transaction();
         for (int i = numSegments; i < 2 * numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(anotherCreateTxn, segment);
+            lsmStore.createLogSegment(anotherCreateTxn, segment, null);
         }
         FutureUtils.result(anotherCreateTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -419,7 +422,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> createTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -459,7 +462,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> deleteTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.deleteLogSegment(deleteTxn, segment);
+            lsmStore.deleteLogSegment(deleteTxn, segment, null);
         }
         FutureUtils.result(deleteTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -491,7 +494,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> createTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -536,7 +539,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> anotherCreateTxn = lsmStore.transaction();
         for (int i = numSegments; i < 2 * numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(anotherCreateTxn, segment);
+            lsmStore.createLogSegment(anotherCreateTxn, segment, null);
         }
         FutureUtils.result(anotherCreateTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -561,7 +564,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> createTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.createLogSegment(createTxn, segment);
+            lsmStore.createLogSegment(createTxn, segment, null);
         }
         FutureUtils.result(createTxn.execute());
         String rootPath = "/" + runtime.getMethodName();
@@ -602,7 +605,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> deleteTxn = lsmStore.transaction();
         for (int i = 0; i < numSegments; i++) {
             LogSegmentMetadata segment = createLogSegment(i);
-            lsmStore.deleteLogSegment(deleteTxn, segment);
+            lsmStore.deleteLogSegment(deleteTxn, segment, null);
         }
         FutureUtils.result(deleteTxn.execute());
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
@@ -634,7 +637,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
         final Promise<Version> result = new Promise<Version>();
-        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value,
+        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
+        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
             @Override
             public void onCommit(Version r) {
@@ -659,7 +664,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
-        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, rootZkPath, value,
+        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        when(metadata.getLogSegmentsPath()).thenReturn(rootZkPath);
+        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
@@ -695,7 +702,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
-        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value,
+        ZKLogMetadata metadata = mock(ZKLogMetadata.class);
+        when(metadata.getLogSegmentsPath()).thenReturn(nonExistentPath);
+        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
@@ -726,7 +735,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(0));
         final Promise<Version> result = new Promise<Version>();
-        lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
+        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
+        lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
             @Override
             public void onCommit(Version r) {
@@ -751,7 +762,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Transaction<Object> updateTxn = lsmStore.transaction();
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
-        lsmStore.storeMaxTxnId(updateTxn, rootZkPath, value,
+        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        when(metadata.getMaxTxIdPath()).thenReturn(rootZkPath);
+        lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {
@@ -787,7 +800,9 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         Versioned<Long> value = new Versioned<Long>(999L, new ZkVersion(10));
         final Promise<Version> result = new Promise<Version>();
         String nonExistentPath = rootZkPath + "/non-existent";
-        lsmStore.storeMaxLogSegmentSequenceNumber(updateTxn, nonExistentPath, value,
+        ZKLogMetadataForWriter metadata = mock(ZKLogMetadataForWriter.class);
+        when(metadata.getMaxTxIdPath()).thenReturn(nonExistentPath);
+        lsmStore.storeMaxTxnId(updateTxn, metadata, value,
                 new Transaction.OpListener<Version>() {
                     @Override
                     public void onCommit(Version r) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
deleted file mode 100644
index 648b828..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriter.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.twitter.distributedlog.TestZooKeeperClientBuilder;
-import com.twitter.distributedlog.metadata.BKDLConfig;
-import com.twitter.distributedlog.metadata.DLMetadata;
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DLMTestUtil;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
-import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.DistributedLogConstants;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.ZooKeeperClusterTestCase;
-import com.twitter.distributedlog.util.DLUtils;
-import com.twitter.distributedlog.util.FutureUtils;
-import com.twitter.distributedlog.util.Utils;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.util.ZkUtils;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Transaction;
-import org.apache.zookeeper.ZooDefs;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.List;
-
-import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
-import static org.junit.Assert.*;
-
-/**
- * Test {@link ZKLogMetadataForWriter}
- */
-public class TestZKLogMetadataForWriter extends ZooKeeperClusterTestCase {
-
-    private static final Logger logger = LoggerFactory.getLogger(TestZKLogMetadataForWriter.class);
-
-    private final static int sessionTimeoutMs = 30000;
-
-    @Rule
-    public TestName testName = new TestName();
-
-    private ZooKeeperClient zkc;
-    private URI uri;
-
-    private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier)
-            throws Exception {
-        final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
-        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
-        final String lockPath = logRootPath + LOCK_PATH;
-        final String readLockPath = logRootPath + READ_LOCK_PATH;
-        final String versionPath = logRootPath + VERSION_PATH;
-        final String allocationPath = logRootPath + ALLOCATION_PATH;
-
-        Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
-                zk.getDefaultACL(), CreateMode.PERSISTENT);
-        Transaction txn = zk.get().transaction();
-        txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
-                        DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
-                zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
-                zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES,
-                zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
-                zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(versionPath, ZKLogMetadataForWriter.intToBytes(LAYOUT_VERSION),
-                zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
-                zk.getDefaultACL(), CreateMode.PERSISTENT);
-        txn.commit();
-    }
-
-    @Before
-    public void setup() throws Exception {
-        zkc = TestZooKeeperClientBuilder.newBuilder()
-                .name("zkc")
-                .uri(DLMTestUtil.createDLMURI(zkPort, "/"))
-                .sessionTimeoutMs(sessionTimeoutMs)
-                .build();
-        uri = DLMTestUtil.createDLMURI(zkPort, "");
-        try {
-            ZkUtils.createFullPathOptimistic(
-                    zkc.get(),
-                    uri.getPath(),
-                    new byte[0],
-                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
-                    CreateMode.PERSISTENT);
-        } catch (KeeperException.NodeExistsException nee) {
-            logger.debug("The namespace uri already exists.");
-        }
-    }
-
-    @After
-    public void teardown() throws Exception {
-        zkc.close();
-    }
-
-    @Test(timeout = 60000)
-    public void testCheckLogMetadataPathsWithAllocator() throws Exception {
-        String logRootPath = "/" + testName.getMethodName();
-        List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(
-                        zkc.get(), logRootPath, true));
-        assertEquals("Should have 8 paths",
-                8, metadatas.size());
-        for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
-            assertNull(path.getValue());
-            assertNull(path.getVersion());
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testCheckLogMetadataPathsWithoutAllocator() throws Exception {
-        String logRootPath = "/" + testName.getMethodName();
-        List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(
-                        zkc.get(), logRootPath, false));
-        assertEquals("Should have 7 paths",
-                7, metadatas.size());
-        for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
-            assertNull(path.getValue());
-            assertNull(path.getVersion());
-        }
-    }
-
-    private void testCreateLogMetadataWithMissingPaths(URI uri,
-                                                       String logName,
-                                                       String logIdentifier,
-                                                       List<String> pathsToDelete,
-                                                       boolean ownAllocator,
-                                                       boolean createLogFirst)
-            throws Exception {
-        if (createLogFirst) {
-            createLog(zkc, uri, logName, logIdentifier);
-        }
-        // delete a path
-        for (String path : pathsToDelete) {
-            zkc.get().delete(path, -1);
-        }
-
-        ZKLogMetadataForWriter logMetadata =
-                FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier,
-                        zkc.get(), zkc.getDefaultACL(), ownAllocator, true));
-
-        final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-
-        List<Versioned<byte[]>> metadatas =
-                FutureUtils.result(ZKLogMetadataForWriter.checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator));
-
-        if (ownAllocator) {
-            assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator,
-                    8, metadatas.size());
-        } else {
-            assertEquals("Should have 7 paths : ownAllocator = " + ownAllocator,
-                    7, metadatas.size());
-        }
-
-        for (Versioned<byte[]> metadata : metadatas) {
-            assertTrue(ZKLogMetadataForWriter.pathExists(metadata));
-            assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0);
-        }
-
-        Versioned<byte[]> logSegmentsData = logMetadata.getMaxLSSNData();
-
-        assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO,
-                DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue()));
-
-        Versioned<byte[]> maxTxIdData = logMetadata.getMaxTxIdData();
-
-        assertEquals(0L, DLUtils.deserializeTransactionId(maxTxIdData.getValue()));
-
-        if (ownAllocator) {
-            Versioned<byte[]> allocationData = logMetadata.getAllocationData();
-            assertEquals(0, allocationData.getValue().length);
-        }
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataMissingLogSegmentsPath() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        List<String> pathsToDelete = Lists.newArrayList(
-                logRootPath + LOGSEGMENTS_PATH);
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataMissingMaxTxIdPath() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        List<String> pathsToDelete = Lists.newArrayList(
-                logRootPath + MAX_TXID_PATH);
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataMissingLockPath() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        List<String> pathsToDelete = Lists.newArrayList(
-                logRootPath + LOCK_PATH);
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataMissingReadLockPath() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        List<String> pathsToDelete = Lists.newArrayList(
-                logRootPath + READ_LOCK_PATH);
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataMissingVersionPath() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        List<String> pathsToDelete = Lists.newArrayList(
-                logRootPath + VERSION_PATH);
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataMissingAllocatorPath() throws Exception {
-        URI uri = DLMTestUtil.createDLMURI(zkPort, "");
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        List<String> pathsToDelete = Lists.newArrayList(
-                logRootPath + ALLOCATION_PATH);
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataMissingAllPath() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
-        List<String> pathsToDelete = Lists.newArrayList(
-                logRootPath + LOGSEGMENTS_PATH,
-                logRootPath + MAX_TXID_PATH,
-                logRootPath + LOCK_PATH,
-                logRootPath + READ_LOCK_PATH,
-                logRootPath + VERSION_PATH,
-                logRootPath + ALLOCATION_PATH);
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataOnExistedLog() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        List<String> pathsToDelete = Lists.newArrayList();
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadata() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        List<String> pathsToDelete = Lists.newArrayList();
-
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
-    }
-
-    @Test(timeout = 60000, expected = LogNotFoundException.class)
-    public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        FutureUtils.result(ZKLogMetadataForWriter.of(uri, logName, logIdentifier,
-                        zkc.get(), zkc.getDefaultACL(), true, false));
-    }
-
-    @Test(timeout = 60000)
-    public void testCreateLogMetadataWithCustomMetadata() throws Exception {
-        String logName = testName.getMethodName();
-        String logIdentifier = "<default>";
-        List<String> pathsToDelete = Lists.newArrayList();
-
-        DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(uri);
-
-        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
-            .conf(new DistributedLogConfiguration())
-            .uri(uri)
-            .build();
-
-        DistributedLogManager dlm = namespace.openLog(logName);
-        dlm.createOrUpdateMetadata(logName.getBytes("UTF-8"));
-        dlm.close();
-
-        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
deleted file mode 100644
index ce67c30..0000000
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogMetadataForWriterUtilFunctions.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.impl.metadata;
-
-import com.google.common.collect.Lists;
-import com.twitter.distributedlog.DLMTestUtil;
-import com.twitter.distributedlog.exceptions.UnexpectedException;
-import com.twitter.distributedlog.util.DLUtils;
-import org.apache.bookkeeper.meta.ZkVersion;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.junit.Test;
-
-import java.net.URI;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-public class TestZKLogMetadataForWriterUtilFunctions {
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000, expected = UnexpectedException.class)
-    public void testProcessLogMetadatasMissingMaxTxnId() throws Exception {
-        String rootPath = "/test-missing-max-txn-id";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000, expected = UnexpectedException.class)
-    public void testProcessLogMetadatasMissingVersion() throws Exception {
-        String rootPath = "/test-missing-version";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000, expected = UnexpectedException.class)
-    public void testProcessLogMetadatasWrongVersion() throws Exception {
-        String rootPath = "/test-missing-version";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(9999), null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000, expected = UnexpectedException.class)
-    public void testProcessLogMetadatasMissingLockPath() throws Exception {
-        String rootPath = "/test-missing-version";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000, expected = UnexpectedException.class)
-    public void testProcessLogMetadatasMissingReadLockPath() throws Exception {
-        String rootPath = "/test-missing-version";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000, expected = UnexpectedException.class)
-    public void testProcessLogMetadatasMissingLogSegmentsPath() throws Exception {
-        String rootPath = "/test-missing-version";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000, expected = UnexpectedException.class)
-    public void testProcessLogMetadatasMissingAllocatorPath() throws Exception {
-        String rootPath = "/test-missing-version";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)),
-                new Versioned<byte[]>(null, null));
-        ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000)
-    public void testProcessLogMetadatasNoAllocatorPath() throws Exception {
-        String rootPath = "/test-missing-version";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        Versioned<byte[]> maxTxnIdData =
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
-        Versioned<byte[]> logSegmentsData =
-                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                maxTxnIdData,
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                logSegmentsData);
-        ZKLogMetadataForWriter metadata =
-                ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
-        assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
-        assertTrue(logSegmentsData == metadata.getMaxLSSNData());
-        assertNull(metadata.getAllocationData().getValue());
-        assertNull(metadata.getAllocationData().getVersion());
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test(timeout = 60000)
-    public void testProcessLogMetadatasAllocatorPath() throws Exception {
-        String rootPath = "/test-missing-version";
-        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
-        String logName = "test-log";
-        String logIdentifier = "<default>";
-        Versioned<byte[]> maxTxnIdData =
-                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
-        Versioned<byte[]> logSegmentsData =
-                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
-        Versioned<byte[]> allocationData =
-                new Versioned<byte[]>(DLUtils.ledgerId2Bytes(1L), new ZkVersion(1));
-        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
-                new Versioned<byte[]>(null, null),
-                new Versioned<byte[]>(null, null),
-                maxTxnIdData,
-                new Versioned<byte[]>(ZKLogMetadataForWriter.intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
-                logSegmentsData,
-                allocationData);
-        ZKLogMetadataForWriter metadata =
-                ZKLogMetadataForWriter.processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
-        assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
-        assertTrue(logSegmentsData == metadata.getMaxLSSNData());
-        assertTrue(allocationData == metadata.getAllocationData());
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
new file mode 100644
index 0000000..9a08aa0
--- /dev/null
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.twitter.distributedlog.TestZooKeeperClientBuilder;
+import com.twitter.distributedlog.metadata.BKDLConfig;
+import com.twitter.distributedlog.metadata.DLMetadata;
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DLMTestUtil;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.namespace.DistributedLogNamespaceBuilder;
+import com.twitter.distributedlog.DistributedLogManager;
+import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.ZooKeeperClusterTestCase;
+import com.twitter.distributedlog.util.DLUtils;
+import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.Utils;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Transaction;
+import org.apache.zookeeper.ZooDefs;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.util.List;
+
+import static com.twitter.distributedlog.impl.metadata.ZKLogMetadata.*;
+import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
+import static org.junit.Assert.*;
+
+/**
+ * Test {@link ZKLogStreamMetadataStore}
+ */
+public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
+
+    private static final Logger logger = LoggerFactory.getLogger(TestZKLogStreamMetadataStore.class);
+
+    private final static int sessionTimeoutMs = 30000;
+
+    @Rule
+    public TestName testName = new TestName();
+
+    private ZooKeeperClient zkc;
+    private URI uri;
+
+    private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier)
+            throws Exception {
+        final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+        final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
+        final String lockPath = logRootPath + LOCK_PATH;
+        final String readLockPath = logRootPath + READ_LOCK_PATH;
+        final String versionPath = logRootPath + VERSION_PATH;
+        final String allocationPath = logRootPath + ALLOCATION_PATH;
+
+        Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        Transaction txn = zk.get().transaction();
+        txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
+                        DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(versionPath, intToBytes(LAYOUT_VERSION),
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
+                zk.getDefaultACL(), CreateMode.PERSISTENT);
+        txn.commit();
+    }
+
+    @Before
+    public void setup() throws Exception {
+        zkc = TestZooKeeperClientBuilder.newBuilder()
+                .name("zkc")
+                .uri(DLMTestUtil.createDLMURI(zkPort, "/"))
+                .sessionTimeoutMs(sessionTimeoutMs)
+                .build();
+        uri = DLMTestUtil.createDLMURI(zkPort, "");
+        try {
+            ZkUtils.createFullPathOptimistic(
+                    zkc.get(),
+                    uri.getPath(),
+                    new byte[0],
+                    ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+        } catch (KeeperException.NodeExistsException nee) {
+            logger.debug("The namespace uri already exists.");
+        }
+    }
+
+    @After
+    public void teardown() throws Exception {
+        zkc.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testCheckLogMetadataPathsWithAllocator() throws Exception {
+        String logRootPath = "/" + testName.getMethodName();
+        List<Versioned<byte[]>> metadatas =
+                FutureUtils.result(checkLogMetadataPaths(
+                        zkc.get(), logRootPath, true));
+        assertEquals("Should have 8 paths",
+                8, metadatas.size());
+        for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
+            assertNull(path.getValue());
+            assertNull(path.getVersion());
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testCheckLogMetadataPathsWithoutAllocator() throws Exception {
+        String logRootPath = "/" + testName.getMethodName();
+        List<Versioned<byte[]>> metadatas =
+                FutureUtils.result(checkLogMetadataPaths(
+                        zkc.get(), logRootPath, false));
+        assertEquals("Should have 7 paths",
+                7, metadatas.size());
+        for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
+            assertNull(path.getValue());
+            assertNull(path.getVersion());
+        }
+    }
+
+    private void testCreateLogMetadataWithMissingPaths(URI uri,
+                                                       String logName,
+                                                       String logIdentifier,
+                                                       List<String> pathsToDelete,
+                                                       boolean ownAllocator,
+                                                       boolean createLogFirst)
+            throws Exception {
+        if (createLogFirst) {
+            createLog(zkc, uri, logName, logIdentifier);
+        }
+        // delete a path
+        for (String path : pathsToDelete) {
+            zkc.get().delete(path, -1);
+        }
+
+        ZKLogMetadataForWriter logMetadata =
+                FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, ownAllocator, true));
+
+        final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+
+        List<Versioned<byte[]>> metadatas =
+                FutureUtils.result(checkLogMetadataPaths(zkc.get(), logRootPath, ownAllocator));
+
+        if (ownAllocator) {
+            assertEquals("Should have 8 paths : ownAllocator = " + ownAllocator,
+                    8, metadatas.size());
+        } else {
+            assertEquals("Should have 7 paths : ownAllocator = " + ownAllocator,
+                    7, metadatas.size());
+        }
+
+        for (Versioned<byte[]> metadata : metadatas) {
+            assertTrue(pathExists(metadata));
+            assertTrue(((ZkVersion) metadata.getVersion()).getZnodeVersion() >= 0);
+        }
+
+        Versioned<byte[]> logSegmentsData = logMetadata.getMaxLSSNData();
+
+        assertEquals(DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO,
+                DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue()));
+
+        Versioned<byte[]> maxTxIdData = logMetadata.getMaxTxIdData();
+
+        assertEquals(0L, DLUtils.deserializeTransactionId(maxTxIdData.getValue()));
+
+        if (ownAllocator) {
+            Versioned<byte[]> allocationData = logMetadata.getAllocationData();
+            assertEquals(0, allocationData.getValue().length);
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataMissingLogSegmentsPath() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        List<String> pathsToDelete = Lists.newArrayList(
+                logRootPath + LOGSEGMENTS_PATH);
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataMissingMaxTxIdPath() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        List<String> pathsToDelete = Lists.newArrayList(
+                logRootPath + MAX_TXID_PATH);
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataMissingLockPath() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        List<String> pathsToDelete = Lists.newArrayList(
+                logRootPath + LOCK_PATH);
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataMissingReadLockPath() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        List<String> pathsToDelete = Lists.newArrayList(
+                logRootPath + READ_LOCK_PATH);
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataMissingVersionPath() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        List<String> pathsToDelete = Lists.newArrayList(
+                logRootPath + VERSION_PATH);
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, false, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataMissingAllocatorPath() throws Exception {
+        URI uri = DLMTestUtil.createDLMURI(zkPort, "");
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        List<String> pathsToDelete = Lists.newArrayList(
+                logRootPath + ALLOCATION_PATH);
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataMissingAllPath() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+        List<String> pathsToDelete = Lists.newArrayList(
+                logRootPath + LOGSEGMENTS_PATH,
+                logRootPath + MAX_TXID_PATH,
+                logRootPath + LOCK_PATH,
+                logRootPath + READ_LOCK_PATH,
+                logRootPath + VERSION_PATH,
+                logRootPath + ALLOCATION_PATH);
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataOnExistedLog() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        List<String> pathsToDelete = Lists.newArrayList();
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadata() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        List<String> pathsToDelete = Lists.newArrayList();
+
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
+    }
+
+    @Test(timeout = 60000, expected = LogNotFoundException.class)
+    public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        FutureUtils.result(getLog(uri, logName, logIdentifier, zkc, true, false));
+    }
+
+    @Test(timeout = 60000)
+    public void testCreateLogMetadataWithCustomMetadata() throws Exception {
+        String logName = testName.getMethodName();
+        String logIdentifier = "<default>";
+        List<String> pathsToDelete = Lists.newArrayList();
+
+        DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(uri);
+
+        DistributedLogNamespace namespace = DistributedLogNamespaceBuilder.newBuilder()
+            .conf(new DistributedLogConfiguration())
+            .uri(uri)
+            .build();
+
+        DistributedLogManager dlm = namespace.openLog(logName);
+        dlm.createOrUpdateMetadata(logName.getBytes("UTF-8"));
+        dlm.close();
+
+        testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
new file mode 100644
index 0000000..f14a217
--- /dev/null
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/metadata/TestZKLogStreamMetadataStoreUtils.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.impl.metadata;
+
+import com.google.common.collect.Lists;
+import com.twitter.distributedlog.DLMTestUtil;
+import com.twitter.distributedlog.exceptions.UnexpectedException;
+import com.twitter.distributedlog.util.DLUtils;
+import org.apache.bookkeeper.meta.ZkVersion;
+import org.apache.bookkeeper.versioning.Versioned;
+import org.junit.Test;
+
+import java.net.URI;
+import java.util.List;
+
+import static com.twitter.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
+import static org.junit.Assert.*;
+
+public class TestZKLogStreamMetadataStoreUtils {
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testProcessLogMetadatasMissingMaxTxnId() throws Exception {
+        String rootPath = "/test-missing-max-txn-id";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null));
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testProcessLogMetadatasMissingVersion() throws Exception {
+        String rootPath = "/test-missing-version";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(null, null));
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testProcessLogMetadatasWrongVersion() throws Exception {
+        String rootPath = "/test-missing-version";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(intToBytes(9999), null));
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testProcessLogMetadatasMissingLockPath() throws Exception {
+        String rootPath = "/test-missing-version";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(null, null));
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testProcessLogMetadatasMissingReadLockPath() throws Exception {
+        String rootPath = "/test-missing-version";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(null, null));
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testProcessLogMetadatasMissingLogSegmentsPath() throws Exception {
+        String rootPath = "/test-missing-version";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(null, null));
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000, expected = UnexpectedException.class)
+    public void testProcessLogMetadatasMissingAllocatorPath() throws Exception {
+        String rootPath = "/test-missing-version";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1)),
+                new Versioned<byte[]>(null, null));
+        processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000)
+    public void testProcessLogMetadatasNoAllocatorPath() throws Exception {
+        String rootPath = "/test-missing-version";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        Versioned<byte[]> maxTxnIdData =
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
+        Versioned<byte[]> logSegmentsData =
+                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                maxTxnIdData,
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                logSegmentsData);
+        ZKLogMetadataForWriter metadata =
+                processLogMetadatas(uri, logName, logIdentifier, metadatas, false);
+        assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
+        assertTrue(logSegmentsData == metadata.getMaxLSSNData());
+        assertNull(metadata.getAllocationData().getValue());
+        assertNull(metadata.getAllocationData().getVersion());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(timeout = 60000)
+    public void testProcessLogMetadatasAllocatorPath() throws Exception {
+        String rootPath = "/test-missing-version";
+        URI uri = DLMTestUtil.createDLMURI(2181, rootPath);
+        String logName = "test-log";
+        String logIdentifier = "<default>";
+        Versioned<byte[]> maxTxnIdData =
+                new Versioned<byte[]>(DLUtils.serializeTransactionId(1L), new ZkVersion(1));
+        Versioned<byte[]> logSegmentsData =
+                new Versioned<byte[]>(DLUtils.serializeLogSegmentSequenceNumber(1L), new ZkVersion(1));
+        Versioned<byte[]> allocationData =
+                new Versioned<byte[]>(DLUtils.ledgerId2Bytes(1L), new ZkVersion(1));
+        List<Versioned<byte[]>> metadatas = Lists.newArrayList(
+                new Versioned<byte[]>(null, null),
+                new Versioned<byte[]>(null, null),
+                maxTxnIdData,
+                new Versioned<byte[]>(intToBytes(ZKLogMetadata.LAYOUT_VERSION), null),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                new Versioned<byte[]>(new byte[0], new ZkVersion(1)),
+                logSegmentsData,
+                allocationData);
+        ZKLogMetadataForWriter metadata =
+                processLogMetadatas(uri, logName, logIdentifier, metadatas, true);
+        assertTrue(maxTxnIdData == metadata.getMaxTxIdData());
+        assertTrue(logSegmentsData == metadata.getMaxLSSNData());
+        assertTrue(allocationData == metadata.getAllocationData());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b91d49a8/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
index 8899c0e..db87a65 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/util/TestPermitManager.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog.util;
 
+import com.twitter.distributedlog.zk.LimitedPermitManager;
 import org.junit.Test;
 
 import java.util.ArrayList;