You are viewing a plain text version of this content. The canonical link for it is here.
Posted to distributedlog-commits@bookkeeper.apache.org by zh...@apache.org on 2017/10/23 09:14:12 UTC
[distributedlog] branch master updated: ISSUE #209: Support rename
log
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/distributedlog.git
The following commit(s) were added to refs/heads/master by this push:
new b670382 ISSUE #209: Support rename log
b670382 is described below
commit b6703827a9df5494a2d7c75d367ce256c1da3121
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Oct 23 17:14:02 2017 +0800
ISSUE #209: Support rename log
Descriptions of the changes in this PR:
- add rename operation in `Namespace`
- add rename operation in `LogStreamMetadataStore`
- implement the rename operation use zookeeper `multi` operation
Author: Sijie Guo <si...@apache.org>
Author: Shoukun Huai <sh...@gmail.com>
Author: Arvin <ar...@gmail.com>
Reviewers: Jia Zhai <None>
This closes #210 from sijie/4_support_rename_pr, closes #209
---
.../distributedlog/BKDistributedLogNamespace.java | 24 ++
.../distributedlog/api/namespace/Namespace.java | 14 +
.../impl/metadata/ZKLogStreamMetadataStore.java | 302 ++++++++++++++++++++-
.../metadata/LogStreamMetadataStore.java | 12 +
.../metadata/TestZKLogStreamMetadataStore.java | 262 +++++++++++++++++-
5 files changed, 601 insertions(+), 13 deletions(-)
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
index cd5a17a..d2e2169 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKDistributedLogNamespace.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.feature.FeatureProvider;
@@ -33,6 +34,7 @@ import org.apache.distributedlog.acl.AccessControlManager;
import org.apache.distributedlog.api.DistributedLogManager;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.callback.NamespaceListener;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.util.PermitLimiter;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -194,6 +196,28 @@ public class BKDistributedLogNamespace implements Namespace {
}
@Override
+ public CompletableFuture<Void> renameLog(String oldName, String newName) {
+ try {
+ checkState();
+ final String oldLogName = validateAndNormalizeName(oldName);
+ final String newLogName = validateAndNormalizeName(newName);
+
+ return driver.getLogMetadataStore().getLogLocation(oldName)
+ .thenCompose(uriOptional -> {
+ if (uriOptional.isPresent()) {
+ return driver.getLogStreamMetadataStore(WRITER)
+ .renameLog(uriOptional.get(), oldLogName, newLogName);
+ } else {
+ return FutureUtils.exception(
+ new LogNotFoundException("Log " + oldLogName + " isn't found."));
+ }
+ });
+ } catch (IOException ioe) {
+ return FutureUtils.exception(ioe);
+ }
+ }
+
+ @Override
public boolean logExists(String logName)
throws IOException, IllegalArgumentException {
checkState();
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
index dafc099..712295d 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/api/namespace/Namespace.java
@@ -20,6 +20,7 @@ package org.apache.distributedlog.api.namespace;
import java.io.IOException;
import java.util.Iterator;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.common.annotation.InterfaceAudience.Public;
import org.apache.bookkeeper.common.annotation.InterfaceStability.Evolving;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -107,6 +108,19 @@ public interface Namespace extends AutoCloseable{
throws InvalidStreamNameException, LogNotFoundException, IOException;
/**
+ * Rename a log from <i>oldName</i> to <i>newName</i>.
+ *
+ * @param oldName old log name
+ * @param newName new log name
+ * @return a future represents the rename result.
+ * @throws InvalidStreamNameException if log name is invalid
+ * @throws LogNotFoundException if old log doesn't exist
+ * @throws org.apache.distributedlog.exceptions.LogExistsException if the new log exists
+ * @throws IOException when encountered issues with backend.
+ */
+ CompletableFuture<Void> renameLog(String oldName, String newName);
+
+ /**
* Open a log named <i>logName</i>.
* A distributedlog manager is returned to access log <i>logName</i>.
*
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index 3c55edc..b3250fa 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -17,14 +17,20 @@
*/
package org.apache.distributedlog.impl.metadata;
+import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.distributedlog.DistributedLogConstants.EMPTY_BYTES;
+import static org.apache.distributedlog.DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO;
import static org.apache.distributedlog.metadata.LogMetadata.*;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
+import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
@@ -35,13 +41,16 @@ import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.ZooKeeperClient;
+import org.apache.distributedlog.ZooKeeperClient.ZooKeeperConnectionException;
import org.apache.distributedlog.common.concurrent.FutureUtils;
import org.apache.distributedlog.common.util.PermitManager;
import org.apache.distributedlog.common.util.SchedulerUtils;
import org.apache.distributedlog.exceptions.DLInterruptedException;
import org.apache.distributedlog.exceptions.InvalidStreamNameException;
import org.apache.distributedlog.exceptions.LockCancelledException;
+import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.UnexpectedException;
@@ -65,7 +74,10 @@ import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Op;
+import org.apache.zookeeper.Op.Create;
+import org.apache.zookeeper.Op.Delete;
import org.apache.zookeeper.OpResult;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
@@ -366,16 +378,16 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
pathsToCreate.add(null);
} else {
String logRootParentPath = Utils.getParent(logRootPath);
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootParentPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ pathsToCreate.add(EMPTY_BYTES);
+ zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
}
// log root path
if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT))) {
pathsToCreate.add(null);
} else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ pathsToCreate.add(EMPTY_BYTES);
+ zkOps.add(Op.create(logRootPath, EMPTY_BYTES, acl, createMode));
}
// max id
@@ -398,15 +410,15 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
if (pathExists(metadatas.get(MetadataIndex.LOCK))) {
pathsToCreate.add(null);
} else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath + LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ pathsToCreate.add(EMPTY_BYTES);
+ zkOps.add(Op.create(logRootPath + LOCK_PATH, EMPTY_BYTES, acl, createMode));
}
// read lock path
if (pathExists(metadatas.get(MetadataIndex.READ_LOCK))) {
pathsToCreate.add(null);
} else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
- zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ pathsToCreate.add(EMPTY_BYTES);
+ zkOps.add(Op.create(logRootPath + READ_LOCK_PATH, EMPTY_BYTES, acl, createMode));
}
// log segments path
if (pathExists(metadatas.get(MetadataIndex.LOGSEGMENTS))) {
@@ -422,9 +434,9 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
if (pathExists(metadatas.get(MetadataIndex.ALLOCATION))) {
pathsToCreate.add(null);
} else {
- pathsToCreate.add(DistributedLogConstants.EMPTY_BYTES);
+ pathsToCreate.add(EMPTY_BYTES);
zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
- DistributedLogConstants.EMPTY_BYTES, acl, createMode));
+ EMPTY_BYTES, acl, createMode));
}
}
if (zkOps.isEmpty()) {
@@ -620,4 +632,274 @@ public class ZKLogStreamMetadataStore implements LogStreamMetadataStore {
}
return promise;
}
+
+ //
+ // Rename Log
+ //
+
+ @Override
+ public CompletableFuture<Void> renameLog(URI uri, String oldStreamName, String newStreamName) {
+ return getLog(
+ uri,
+ oldStreamName,
+ true,
+ false
+ ).thenCompose(metadata -> renameLogMetadata(uri, metadata, newStreamName));
+ }
+
+ private CompletableFuture<Void> renameLogMetadata(URI uri,
+ LogMetadataForWriter oldMetadata,
+ String newStreamName) {
+
+
+ final LinkedList<Op> createOps = Lists.newLinkedList();
+ final LinkedList<Op> deleteOps = Lists.newLinkedList();
+
+ List<ACL> acls = zooKeeperClient.getDefaultACL();
+
+ // get the root path
+ String oldRootPath = oldMetadata.getLogRootPath();
+ String newRootPath = LogMetadata.getLogRootPath(
+ uri, newStreamName, conf.getUnpartitionedStreamName());
+
+ // 0. the log path
+ deleteOps.addFirst(Op.delete(
+ LogMetadata.getLogStreamPath(uri, oldMetadata.getLogName()), -1));
+
+ // 1. the root path
+ createOps.addLast(Op.create(
+ newRootPath, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+ deleteOps.addFirst(Op.delete(
+ oldRootPath, -1));
+
+ // 2. max id
+ Versioned<byte[]> maxTxIdData = oldMetadata.getMaxTxIdData();
+ deleteOldPathAndCreateNewPath(
+ oldRootPath, MAX_TXID_PATH, maxTxIdData,
+ newRootPath, DLUtils.serializeTransactionId(0L), acls,
+ createOps, deleteOps
+ );
+
+ // 3. version
+ createOps.addLast(Op.create(
+ newRootPath + VERSION_PATH, intToBytes(LAYOUT_VERSION), acls, CreateMode.PERSISTENT));
+ deleteOps.addFirst(Op.delete(
+ oldRootPath + VERSION_PATH, -1));
+
+ // 4. lock path (NOTE: if the stream is locked by a writer, then the delete will fail as you can not
+ // delete the lock path if children is not empty.
+ createOps.addLast(Op.create(
+ newRootPath + LOCK_PATH, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+ deleteOps.addFirst(Op.delete(
+ oldRootPath + LOCK_PATH, -1));
+
+ // 5. read lock path (NOTE: same reason as the write lock)
+ createOps.addLast(Op.create(
+ newRootPath + READ_LOCK_PATH, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+ deleteOps.addFirst(Op.delete(
+ oldRootPath + READ_LOCK_PATH, -1));
+
+ // 6. allocation path
+ Versioned<byte[]> allocationData = oldMetadata.getAllocationData();
+ deleteOldPathAndCreateNewPath(
+ oldRootPath, ALLOCATION_PATH, allocationData,
+ newRootPath, EMPTY_BYTES, acls,
+ createOps, deleteOps);
+
+ // 7. log segments
+ Versioned<byte[]> maxLSSNData = oldMetadata.getMaxLSSNData();
+ deleteOldPathAndCreateNewPath(
+ oldRootPath, LOGSEGMENTS_PATH, maxLSSNData,
+ newRootPath, DLUtils.serializeLogSegmentSequenceNumber(UNASSIGNED_LOGSEGMENT_SEQNO), acls,
+ createOps, deleteOps);
+
+ // 8. copy the log segments
+ CompletableFuture<List<LogSegmentMetadata>> segmentsFuture;
+ if (pathExists(maxLSSNData)) {
+ segmentsFuture = getLogSegments(zooKeeperClient, oldRootPath + LOGSEGMENTS_PATH);
+ } else {
+ segmentsFuture = FutureUtils.value(Collections.emptyList());
+ }
+ return segmentsFuture
+ // copy the segments
+ .thenApply(segments -> {
+ for (LogSegmentMetadata segment : segments) {
+ deleteOldSegmentAndCreateNewSegment(
+ segment,
+ newRootPath + LOGSEGMENTS_PATH,
+ acls,
+ createOps,
+ deleteOps);
+ }
+ return null;
+ })
+ // get the missing paths
+ .thenCompose(ignored ->
+ getMissingPaths(zooKeeperClient, uri, newStreamName)
+ )
+ // create the missing paths and execute the rename transaction
+ .thenCompose(paths -> {
+ for (String path : paths) {
+ createOps.addFirst(Op.create(
+ path, EMPTY_BYTES, acls, CreateMode.PERSISTENT));
+ }
+ return executeRenameTxn(oldRootPath, newRootPath, createOps, deleteOps);
+ });
+ }
+
+ @VisibleForTesting
+ static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
+ String basePath = uri.getPath();
+ String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
+ LinkedList<String> missingPaths = Lists.newLinkedList();
+
+ CompletableFuture<List<String>> future = FutureUtils.createFuture();
+ try {
+ existPath(zkc.get(), logStreamPath, basePath, missingPaths, future);
+ } catch (ZooKeeperConnectionException | InterruptedException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ private static void existPath(ZooKeeper zk,
+ String path,
+ String basePath,
+ LinkedList<String> missingPaths,
+ CompletableFuture<List<String>> future) {
+ if (basePath.equals(path)) {
+ future.complete(missingPaths);
+ return;
+ }
+ zk.exists(path, false, (rc, path1, ctx, stat) -> {
+ if (Code.OK.intValue() != rc && Code.NONODE.intValue() != rc) {
+ future.completeExceptionally(new ZKException("Failed to check existence of path " + path1,
+ Code.get(rc)));
+ return;
+ }
+
+ if (Code.OK.intValue() == rc) {
+ future.complete(missingPaths);
+ return;
+ }
+
+ missingPaths.addLast(path);
+ String parentPath = Utils.getParent(path);
+ existPath(zk, parentPath, basePath, missingPaths, future);
+ }, null);
+ }
+
+ private CompletableFuture<Void> executeRenameTxn(String oldLogPath,
+ String newLogPath,
+ LinkedList<Op> createOps,
+ LinkedList<Op> deleteOps) {
+ CompletableFuture<Void> future = FutureUtils.createFuture();
+ List<Op> zkOps = Lists.newArrayListWithExpectedSize(createOps.size() + deleteOps.size());
+ zkOps.addAll(createOps);
+ zkOps.addAll(deleteOps);
+
+ if (LOG.isDebugEnabled()) {
+ for (Op op : zkOps) {
+ if (op instanceof Create) {
+ Create create = (Create) op;
+ LOG.debug("op : create {}", create.getPath());
+ } else if (op instanceof Delete) {
+ Delete delete = (Delete) op;
+ LOG.debug("op : delete {}, record = {}", delete.getPath(), op.toRequestRecord());
+ } else {
+ LOG.debug("op : {}", op);
+ }
+ }
+ }
+
+ try {
+ zooKeeperClient.get().multi(zkOps, (rc, path, ctx, opResults) -> {
+ if (Code.OK.intValue() == rc) {
+ future.complete(null);
+ } else if (Code.NODEEXISTS.intValue() == rc) {
+ future.completeExceptionally(new LogExistsException("Someone just created new log " + newLogPath));
+ } else if (Code.NOTEMPTY.intValue() == rc) {
+ future.completeExceptionally(new LockingException(oldLogPath + LOCK_PATH,
+ "Someone is holding a lock on log " + oldLogPath));
+ } else {
+ future.completeExceptionally(new ZKException("Failed to rename log "
+ + oldLogPath + " to " + newLogPath + " at path " + path, Code.get(rc)));
+ }
+ }, null);
+ } catch (ZooKeeperConnectionException e) {
+ future.completeExceptionally(e);
+ } catch (InterruptedException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
+ private static void deleteOldSegmentAndCreateNewSegment(LogSegmentMetadata oldMetadata,
+ String newSegmentsPath,
+ List<ACL> acls,
+ LinkedList<Op> createOps,
+ LinkedList<Op> deleteOps) {
+ createOps.addLast(Op.create(
+ newSegmentsPath + "/" + oldMetadata.getZNodeName(),
+ oldMetadata.getFinalisedData().getBytes(UTF_8),
+ acls,
+ CreateMode.PERSISTENT));
+ deleteOps.addFirst(Op.delete(
+ oldMetadata.getZkPath(),
+ -1));
+ }
+
+ private static void deleteOldPathAndCreateNewPath(String oldRootPath,
+ String nodePath,
+ Versioned<byte[]> pathData,
+ String newRootPath,
+ byte[] initData,
+ List<ACL> acls,
+ LinkedList<Op> createOps,
+ LinkedList<Op> deleteOps) {
+ if (pathExists(pathData)) {
+ createOps.addLast(Op.create(
+ newRootPath + nodePath, pathData.getValue(), acls, CreateMode.PERSISTENT));
+ deleteOps.addFirst(Op.delete(
+ oldRootPath + nodePath, (int) ((LongVersion) pathData.getVersion()).getLongVersion()));
+ } else {
+ createOps.addLast(Op.create(
+ newRootPath + nodePath, initData, acls, CreateMode.PERSISTENT));
+ }
+ }
+
+ @VisibleForTesting
+ static CompletableFuture<List<LogSegmentMetadata>> getLogSegments(ZooKeeperClient zk,
+ String logSegmentsPath) {
+ CompletableFuture<List<LogSegmentMetadata>> future = FutureUtils.createFuture();
+ try {
+ zk.get().getChildren(logSegmentsPath, false, (rc, path, ctx, children, stat) -> {
+ if (Code.OK.intValue() != rc) {
+ if (Code.NONODE.intValue() == rc) {
+ future.completeExceptionally(new LogNotFoundException("Log " + path + " not found"));
+ } else {
+ future.completeExceptionally(new ZKException("Failed to get log segments from " + path,
+ Code.get(rc)));
+ }
+ return;
+ }
+
+ // get all the segments
+ List<CompletableFuture<LogSegmentMetadata>> futures =
+ Lists.newArrayListWithExpectedSize(children.size());
+ for (String child : children) {
+ futures.add(LogSegmentMetadata.read(zk, logSegmentsPath + "/" + child));
+ }
+ FutureUtils.proxyTo(
+ FutureUtils.collect(futures),
+ future);
+ }, null);
+ } catch (ZooKeeperConnectionException e) {
+ future.completeExceptionally(e);
+ } catch (InterruptedException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
+
}
diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
index 41dc500..712619a 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/metadata/LogStreamMetadataStore.java
@@ -95,6 +95,18 @@ public interface LogStreamMetadataStore extends Closeable {
CompletableFuture<Void> deleteLog(URI uri, String streamName);
/**
+ * Rename the log from <i>oldStreamName</i> to <i>newStreamName</i>.
+ *
+ * @param uri the location to store the metadata of the log
+ * @param oldStreamName the old name of the log stream
+ * @param newStreamName the new name of the log stream
+ * @return future represents the result of the rename operation.
+ */
+ CompletableFuture<Void> renameLog(URI uri,
+ String oldStreamName,
+ String newStreamName);
+
+ /**
* Get the log segment metadata store.
*
* @return the log segment metadata store.
diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
index f1cec9d..4aa832a 100644
--- a/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/test/java/org/apache/distributedlog/impl/metadata/TestZKLogStreamMetadataStore.java
@@ -17,36 +17,60 @@
*/
package org.apache.distributedlog.impl.metadata;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.apache.distributedlog.DistributedLogConstants.EMPTY_BYTES;
import static org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore.*;
import static org.apache.distributedlog.metadata.LogMetadata.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import java.net.URI;
+import java.util.Collections;
import java.util.List;
+import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
+import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.apache.distributedlog.api.MetadataAccessor;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
+import org.apache.distributedlog.common.concurrent.FutureUtils;
+import org.apache.distributedlog.exceptions.LockingException;
+import org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
+import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.metadata.DLMetadata;
+import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.util.DLUtils;
+import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.Utils;
+import org.apache.zookeeper.AsyncCallback.Children2Callback;
+import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Transaction;
import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
@@ -69,6 +93,8 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
private ZooKeeperClient zkc;
private URI uri;
+ private OrderedScheduler scheduler;
+ private ZKLogStreamMetadataStore metadataStore;
private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier)
throws Exception {
@@ -88,17 +114,68 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES,
+ txn.create(lockPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES,
+ txn.create(readLockPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.create(versionPath, intToBytes(LAYOUT_VERSION),
zk.getDefaultACL(), CreateMode.PERSISTENT);
- txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES,
+ txn.create(allocationPath, EMPTY_BYTES,
zk.getDefaultACL(), CreateMode.PERSISTENT);
txn.commit();
}
+ private static void createLog(ZooKeeperClient zk,
+ URI uri,
+ String logName,
+ String logIdentifier,
+ int numSegments)
+ throws Exception {
+ final String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ final String logSegmentsPath = logRootPath + LOGSEGMENTS_PATH;
+ final String maxTxIdPath = logRootPath + MAX_TXID_PATH;
+ final String lockPath = logRootPath + LOCK_PATH;
+ final String readLockPath = logRootPath + READ_LOCK_PATH;
+ final String versionPath = logRootPath + VERSION_PATH;
+ final String allocationPath = logRootPath + ALLOCATION_PATH;
+
+ Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0],
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ Transaction txn = zk.get().transaction();
+ txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(
+ DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO),
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L),
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(lockPath, EMPTY_BYTES,
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(readLockPath, EMPTY_BYTES,
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(versionPath, intToBytes(LAYOUT_VERSION),
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+ txn.create(allocationPath, EMPTY_BYTES,
+ zk.getDefaultACL(), CreateMode.PERSISTENT);
+
+ for (int i = 0; i < numSegments; i++) {
+ LogSegmentMetadata segment = DLMTestUtil.completedLogSegment(
+ logSegmentsPath,
+ i + 1L,
+ 1L + i * 1000L,
+ (i + 1) * 1000L,
+ 1000,
+ i + 1L,
+ 999L,
+ 0L);
+ txn.create(
+ segment.getZkPath(),
+ segment.getFinalisedData().getBytes(UTF_8),
+ zk.getDefaultACL(),
+ CreateMode.PERSISTENT);
+ }
+
+ txn.commit();
+ }
+
@Before
public void setup() throws Exception {
zkc = TestZooKeeperClientBuilder.newBuilder()
@@ -117,10 +194,26 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
} catch (KeeperException.NodeExistsException nee) {
logger.debug("The namespace uri already exists.");
}
+ scheduler = OrderedScheduler.newBuilder()
+ .name("test-scheduler")
+ .corePoolSize(1)
+ .build();
+ metadataStore = new ZKLogStreamMetadataStore(
+ "test-logstream-metadata-store",
+ new DistributedLogConfiguration(),
+ zkc,
+ scheduler,
+ NullStatsLogger.INSTANCE);
}
@After
public void teardown() throws Exception {
+ if (null != metadataStore) {
+ metadataStore.close();
+ }
+ if (null != scheduler) {
+ scheduler.shutdown();
+ }
zkc.close();
}
@@ -323,4 +416,167 @@ public class TestZKLogStreamMetadataStore extends ZooKeeperClusterTestCase {
testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, false);
}
+ @Test(timeout = 60000, expected = LogNotFoundException.class)
+ public void testGetLogSegmentsLogNotFound() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+
+ String logSegmentsPath = LogMetadata.getLogSegmentsPath(uri, logName, logIdentifier);
+ FutureUtils.result(getLogSegments(zkc, logSegmentsPath));
+ }
+
+ @Test(timeout = 60000)
+ public void testGetLogSegmentsZKExceptions() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+
+ ZooKeeper mockZk = mock(ZooKeeper.class);
+ ZooKeeperClient mockZkc = mock(ZooKeeperClient.class);
+ when(mockZkc.get()).thenReturn(mockZk);
+ doAnswer(invocationOnMock -> {
+ String path = (String) invocationOnMock.getArguments()[0];
+ Children2Callback callback = (Children2Callback) invocationOnMock.getArguments()[2];
+ callback.processResult(Code.BADVERSION.intValue(), path, null, null, null);
+ return null;
+ }).when(mockZk).getChildren(anyString(), anyBoolean(), any(Children2Callback.class), anyObject());
+
+ String logSegmentsPath = LogMetadata.getLogSegmentsPath(uri, logName, logIdentifier);
+ try {
+ FutureUtils.result(getLogSegments(mockZkc, logSegmentsPath));
+ fail("Should fail to get log segments when encountering zk exceptions");
+ } catch (ZKException zke) {
+ assertEquals(Code.BADVERSION, zke.getKeeperExceptionCode());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testGetLogSegments() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+
+ // create log
+ createLog(
+ zkc,
+ uri,
+ logName,
+ logIdentifier,
+ 5);
+
+ List<LogSegmentMetadata> segments = FutureUtils.result(
+ getLogSegments(zkc, LogMetadata.getLogSegmentsPath(uri, logName, logIdentifier)));
+ assertEquals(5, segments.size());
+ for (int i = 0; i < 5; i++) {
+ assertEquals(1L + i, segments.get(i).getLogSegmentSequenceNumber());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testGetMissingPathsRecursive() throws Exception {
+ List<String> missingPaths = FutureUtils.result(
+ getMissingPaths(zkc, uri, "path/to/log"));
+
+ assertEquals(
+ Lists.newArrayList(
+ uri.getPath() + "/path/to/log",
+ uri.getPath() + "/path/to",
+ uri.getPath() + "/path"
+ ),
+ missingPaths);
+ }
+
+ @Test(timeout = 60000)
+ public void testGetMissingPathsRecursive2() throws Exception {
+ String path = uri.getPath() + "/path/to/log";
+ ZkUtils.createFullPathOptimistic(
+ zkc.get(), path, EMPTY_BYTES, zkc.getDefaultACL(), CreateMode.PERSISTENT);
+
+ List<String> missingPaths = FutureUtils.result(
+ getMissingPaths(zkc, uri, "path/to/log"));
+
+ assertEquals(
+ Collections.emptyList(),
+ missingPaths);
+ }
+
+ @Test(timeout = 60000)
+ public void testGetMissingPathsFailure() throws Exception {
+ ZooKeeper mockZk = mock(ZooKeeper.class);
+ ZooKeeperClient mockZkc = mock(ZooKeeperClient.class);
+ when(mockZkc.get()).thenReturn(mockZk);
+ doAnswer(invocationOnMock -> {
+ String path = (String) invocationOnMock.getArguments()[0];
+ StatCallback callback = (StatCallback) invocationOnMock.getArguments()[2];
+ callback.processResult(Code.BADVERSION.intValue(), path, null, null);
+ return null;
+ }).when(mockZk).exists(anyString(), anyBoolean(), any(StatCallback.class), anyObject());
+
+ try {
+ FutureUtils.result(getMissingPaths(mockZkc, uri, "path/to/log"));
+ fail("Should fail on getting missing paths on zookeeper exceptions.");
+ } catch (ZKException zke) {
+ assertEquals(Code.BADVERSION, zke.getKeeperExceptionCode());
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRenameLog() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ int numSegments = 5;
+
+ createLog(
+ zkc,
+ uri,
+ logName,
+ logIdentifier,
+ numSegments);
+
+ String newLogName = "path/to/new/" + logName;
+ FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+ }
+
+ @Test(timeout = 60000, expected = LogExistsException.class)
+ public void testRenameLogExists() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ int numSegments = 5;
+ createLog(
+ zkc,
+ uri,
+ logName,
+ logIdentifier,
+ numSegments);
+
+ String newLogName = "path/to/new/" + logName;
+ createLog(
+ zkc,
+ uri,
+ newLogName,
+ logIdentifier,
+ 3);
+
+ FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+ }
+
+ @Test(timeout = 60000, expected = LockingException.class)
+ public void testRenameLockedLog() throws Exception {
+ String logName = testName.getMethodName();
+ String logIdentifier = "<default>";
+ int numSegments = 5;
+ createLog(
+ zkc,
+ uri,
+ logName,
+ logIdentifier,
+ numSegments);
+
+ // create a lock
+ String logRootPath = getLogRootPath(uri, logName, logIdentifier);
+ String lockPath = logRootPath + LOCK_PATH;
+ zkc.get().create(lockPath + "/test", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+ String newLogName = "path/to/new/" + logName;
+ FutureUtils.result(metadataStore.renameLog(uri, logName, newLogName));
+ }
+
}
--
To stop receiving notification emails like this one, please contact
['"distributedlog-commits@bookkeeper.apache.org" <di...@bookkeeper.apache.org>'].