You are viewing a plain text version of this content. The canonical link for it is here.
Posted to distributedlog-issues@bookkeeper.apache.org by GitBox <gi...@apache.org> on 2017/10/24 11:36:06 UTC

[GitHub] jiazhai closed pull request #228: Issue 225: Create log should create missing path components

jiazhai closed pull request #228: Issue 225: Create log should create missing path components
URL: https://github.com/apache/distributedlog/pull/228
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
index b3250fa2..c046fc62 100644
--- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
+++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/metadata/ZKLogStreamMetadataStore.java
@@ -35,7 +35,7 @@
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
+import org.apache.bookkeeper.common.concurrent.FutureEventListener;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.versioning.LongVersion;
 import org.apache.bookkeeper.versioning.Versioned;
@@ -363,6 +363,7 @@ static void ensureMetadataExist(Versioned<byte[]> metadata) {
     }
 
     static void createMissingMetadata(final ZooKeeper zk,
+                                      final String basePath,
                                       final String logRootPath,
                                       final List<Versioned<byte[]>> metadatas,
                                       final List<ACL> acl,
@@ -374,10 +375,10 @@ static void createMissingMetadata(final ZooKeeper zk,
         CreateMode createMode = CreateMode.PERSISTENT;
 
         // log root parent path
+        String logRootParentPath = Utils.getParent(logRootPath);
         if (pathExists(metadatas.get(MetadataIndex.LOG_ROOT_PARENT))) {
             pathsToCreate.add(null);
         } else {
-            String logRootParentPath = Utils.getParent(logRootPath);
             pathsToCreate.add(EMPTY_BYTES);
             zkOps.add(Op.create(logRootParentPath, EMPTY_BYTES, acl, createMode));
         }
@@ -425,7 +426,7 @@ static void createMissingMetadata(final ZooKeeper zk,
             pathsToCreate.add(null);
         } else {
             byte[] logSegmentsData = DLUtils.serializeLogSegmentSequenceNumber(
-                    DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
+                DistributedLogConstants.UNASSIGNED_LOGSEGMENT_SEQNO);
             pathsToCreate.add(logSegmentsData);
             zkOps.add(Op.create(logRootPath + LOGSEGMENTS_PATH, logSegmentsData, acl, createMode));
         }
@@ -436,7 +437,7 @@ static void createMissingMetadata(final ZooKeeper zk,
             } else {
                 pathsToCreate.add(EMPTY_BYTES);
                 zkOps.add(Op.create(logRootPath + ALLOCATION_PATH,
-                        EMPTY_BYTES, acl, createMode));
+                    EMPTY_BYTES, acl, createMode));
             }
         }
         if (zkOps.isEmpty()) {
@@ -449,6 +450,41 @@ static void createMissingMetadata(final ZooKeeper zk,
             return;
         }
 
+        getMissingPaths(zk, basePath, Utils.getParent(logRootParentPath))
+            .whenComplete(new FutureEventListener<List<String>>() {
+                @Override
+                public void onSuccess(List<String> paths) {
+                    for (String path : paths) {
+                        pathsToCreate.add(EMPTY_BYTES);
+                        zkOps.add(
+                            0, Op.create(path, EMPTY_BYTES, acl, createMode));
+                    }
+                    executeCreateMissingPathTxn(
+                        zk,
+                        zkOps,
+                        pathsToCreate,
+                        metadatas,
+                        logRootPath,
+                        promise
+                    );
+                }
+
+                @Override
+                public void onFailure(Throwable cause) {
+                    promise.completeExceptionally(cause);
+                    return;
+                }
+            });
+
+    }
+
+    private static void executeCreateMissingPathTxn(ZooKeeper zk,
+                                                    List<Op> zkOps,
+                                                    List<byte[]> pathsToCreate,
+                                                    List<Versioned<byte[]>> metadatas,
+                                                    String logRootPath,
+                                                    CompletableFuture<List<Versioned<byte[]>>> promise) {
+
         zk.multi(zkOps, new AsyncCallback.MultiCallback() {
             @Override
             public void processResult(int rc, String path, Object ctx, List<OpResult> resultList) {
@@ -549,29 +585,30 @@ static LogMetadataForWriter processLogMetadatas(URI uri,
         try {
             final ZooKeeper zk = zooKeeperClient.get();
             return checkLogMetadataPaths(zk, logRootPath, ownAllocator)
-                    .thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<List<Versioned<byte[]>>>>() {
-                        @Override
-                        public CompletableFuture<List<Versioned<byte[]>>> apply(List<Versioned<byte[]>> metadatas) {
-                            CompletableFuture<List<Versioned<byte[]>>> promise =
-                                    new CompletableFuture<List<Versioned<byte[]>>>();
-                            createMissingMetadata(zk, logRootPath, metadatas, zooKeeperClient.getDefaultACL(),
-                                    ownAllocator, createIfNotExists, promise);
-                            return promise;
-                        }
-                    }).thenCompose(new Function<List<Versioned<byte[]>>, CompletableFuture<LogMetadataForWriter>>() {
-                        @Override
-                        public CompletableFuture<LogMetadataForWriter> apply(List<Versioned<byte[]>> metadatas) {
-                            try {
-                                return FutureUtils.value(
-                                    processLogMetadatas(
-                                        uri,
-                                        logName,
-                                        logIdentifier,
-                                        metadatas,
-                                        ownAllocator));
-                            } catch (UnexpectedException e) {
-                                return FutureUtils.exception(e);
-                            }
+                    .thenCompose(metadatas -> {
+                        CompletableFuture<List<Versioned<byte[]>>> promise =
+                                new CompletableFuture<List<Versioned<byte[]>>>();
+                        createMissingMetadata(
+                            zk,
+                            uri.getPath(),
+                            logRootPath,
+                            metadatas,
+                            zooKeeperClient.getDefaultACL(),
+                            ownAllocator,
+                            createIfNotExists,
+                            promise);
+                        return promise;
+                    }).thenCompose(metadatas -> {
+                        try {
+                            return FutureUtils.value(
+                                processLogMetadatas(
+                                    uri,
+                                    logName,
+                                    logIdentifier,
+                                    metadatas,
+                                    ownAllocator));
+                        } catch (UnexpectedException e) {
+                            return FutureUtils.exception(e);
                         }
                     });
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
@@ -749,16 +786,22 @@ public void processResult(int rc, String path, Object ctx) {
 
     @VisibleForTesting
     static CompletableFuture<List<String>> getMissingPaths(ZooKeeperClient zkc, URI uri, String logName) {
+        ZooKeeper zk;
+        try {
+            zk = zkc.get();
+        } catch (ZooKeeperConnectionException | InterruptedException e) {
+            return FutureUtils.exception(e);
+        }
         String basePath = uri.getPath();
         String logStreamPath = LogMetadata.getLogStreamPath(uri, logName);
-        LinkedList<String> missingPaths = Lists.newLinkedList();
+        return getMissingPaths(zk, basePath, logStreamPath);
+    }
 
+    @VisibleForTesting
+    static CompletableFuture<List<String>> getMissingPaths(ZooKeeper zk, String basePath, String logStreamPath) {
+        LinkedList<String> missingPaths = Lists.newLinkedList();
         CompletableFuture<List<String>> future = FutureUtils.createFuture();
-        try {
-            existPath(zkc.get(), logStreamPath, basePath, missingPaths, future);
-        } catch (ZooKeeperConnectionException | InterruptedException e) {
-            future.completeExceptionally(e);
-        }
+        existPath(zk, logStreamPath, basePath, missingPaths, future);
         return future;
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services