You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/14 14:36:57 UTC

[2/3] curator git commit: reworking so that this feature is more general. Now manages any set of transactions

reworking so that this feature is more general. Now manages any set of transactions


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a15582b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a15582b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a15582b

Branch: refs/heads/CURATOR-421
Commit: 1a15582b59ccfaf80b1547fd57bcb7dfe894b1c8
Parents: 1a0c162
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 09:24:33 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 09:24:33 2017 -0500

----------------------------------------------------------------------
 .../InvalidMigrationSetException.java           |  37 ----
 .../x/async/modeled/migrations/Migration.java   |  14 +-
 .../modeled/migrations/MigrationException.java  |  37 ++++
 .../modeled/migrations/MigrationManager.java    | 132 +++++++++++++-
 .../migrations/MigrationManagerBuilder.java     |  68 -------
 .../migrations/MigrationManagerImpl.java        | 181 -------------------
 .../async/modeled/migrations/MigrationSet.java  |  11 +-
 .../migrations/TestMigrationManager.java        |  35 ++--
 8 files changed, 196 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
deleted file mode 100644
index 84b21bf..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import java.util.Objects;
-
-public class InvalidMigrationSetException extends RuntimeException
-{
-    private final String migrationId;
-
-    public InvalidMigrationSetException(String migrationId, String message)
-    {
-        super(message);
-        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
-    }
-
-    public String getMigrationId()
-    {
-        return migrationId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index b3919d1..63a7a7d 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -18,8 +18,10 @@
  */
 package org.apache.curator.x.async.modeled.migrations;
 
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import java.util.List;
 import java.util.Objects;
-import java.util.function.UnaryOperator;
+import java.util.function.Supplier;
 
 public interface Migration
 {
@@ -27,12 +29,12 @@ public interface Migration
 
     int version();
 
-    byte[] migrate(byte[] previousBytes);
+    List<CuratorOp> operations();
 
-    static Migration build(String id, int version, UnaryOperator<byte[]> migrateProc)
+    static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
     {
         Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(migrateProc, "migrateProc cannot be null");
+        Objects.requireNonNull(operationsProc, "operationsProc cannot be null");
         return new Migration()
         {
             @Override
@@ -48,9 +50,9 @@ public interface Migration
             }
 
             @Override
-            public byte[] migrate(byte[] previousBytes)
+            public List<CuratorOp> operations()
             {
-                return migrateProc.apply(previousBytes);
+                return operationsProc.get();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
new file mode 100644
index 0000000..1a1d59c
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+
+public class MigrationException extends RuntimeException
+{
+    private final String migrationId;
+
+    public MigrationException(String migrationId, String message)
+    {
+        super(message);
+        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+    }
+
+    public String getMigrationId()
+    {
+        return migrationId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 2d5f39f..47adb1e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -18,20 +18,142 @@
  */
 package org.apache.curator.x.async.modeled.migrations;
 
+import com.google.common.base.Throwables;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.CreateMode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
-public interface MigrationManager
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+public class MigrationManager
 {
-    CompletionStage<List<MetaData>> metaData(ZPath metaDataPath);
+    private final AsyncCuratorFramework client;
+    private final ZPath lockPath;
+    private final ModelSerializer<MetaData> metaDataSerializer;
+    private final Executor executor;
+    private final Duration lockMax;
+
+    private static final String META_DATA_NODE_NAME = "meta-";
+
+    public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+        this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
+        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+    }
+
+    public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
+    {
+        ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
+        return ZNode.models(modeled.childrenAsZNodes());
+    }
+
+    public CompletionStage<Void> migrate(MigrationSet set)
+    {
+        String lockPath = this.lockPath.child(set.id()).fullPath();
+        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
+        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+    }
+
+    protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
+    {
+        if ( sortedMetaData.size() > set.migrations().size() )
+        {
+            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+        }
 
-    CompletionStage<Void> run();
+        int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
+        List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
+            .stream()
+            .map(m -> new MetaData(m.id(), m.version()))
+            .collect(Collectors.toList());
+        if ( !compareMigrations.equals(sortedMetaData) )
+        {
+            throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+        }
+        return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
+    }
+
+    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+    {
+        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
+        return modeled.childrenAsZNodes()
+            .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
+            .handle((v, e) -> {
+                release(lock, true);
+                if ( e != null )
+                {
+                    Throwables.propagate(e);
+                }
+                return v;
+            }
+        );
+    }
+
+    private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
+    {
+        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+        return ModeledFramework.wrap(client, modelSpec);
+    }
+
+    private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
+    {
+        List<MetaData> sortedMetaData = metaDataNodes
+            .stream()
+            .sorted(Comparator.comparing(m -> m.path().fullPath()))
+            .map(ZNode::model)
+            .collect(Collectors.toList());
+
+        List<Migration> toBeApplied;
+        try
+        {
+            toBeApplied = filter(set, sortedMetaData);
+        }
+        catch ( MigrationException e )
+        {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+
+        if ( toBeApplied.size() == 0 )
+        {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
+            .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient));
+    }
 
-    static MigrationManagerBuilder builder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
+    private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
     {
-        return new MigrationManagerBuilder(client, lockPath, metaDataSerializer);
+        List<CuratorOp> operations = new ArrayList<>();
+        for ( Migration migration : toBeApplied )
+        {
+            operations.addAll(migration.operations());
+            MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+            operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
+        }
+        return client.transaction().forOperations(operations).thenApply(__ -> null);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
deleted file mode 100644
index ed48242..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ZPath;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Executor;
-
-public class MigrationManagerBuilder
-{
-    private final AsyncCuratorFramework client;
-    private final ZPath lockPath;
-    private final ModelSerializer<MetaData> metaDataSerializer;
-    private final List<MigrationSet> sets = new ArrayList<>();
-    private Executor executor = Runnable::run;
-    private Duration lockMax = Duration.ofSeconds(15);
-
-    MigrationManagerBuilder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
-    {
-        this.client = Objects.requireNonNull(client, "client cannot be null");
-        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
-        this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
-    }
-
-    public MigrationManager build()
-    {
-        return new MigrationManagerImpl(client, lockPath, metaDataSerializer, executor, lockMax, sets);
-    }
-
-    public MigrationManagerBuilder withExecutor(Executor executor)
-    {
-        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
-        return this;
-    }
-
-    public MigrationManagerBuilder withLockMax(Duration lockMax)
-    {
-        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
-        return this;
-    }
-
-    public MigrationManagerBuilder adding(MigrationSet set)
-    {
-        sets.add(Objects.requireNonNull(set, "set cannot be null"));
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
deleted file mode 100644
index 15c61b2..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZNode;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.CreateMode;
-import java.time.Duration;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.curator.x.async.AsyncWrappers.*;
-
-class MigrationManagerImpl implements MigrationManager
-{
-    private final AsyncCuratorFramework client;
-    private final ZPath lockPath;
-    private final ModelSerializer<MetaData> metaDataSerializer;
-    private final Executor executor;
-    private final Duration lockMax;
-    private final List<MigrationSet> sets;
-
-    private static final String META_DATA_NODE_NAME = "meta-";
-
-    MigrationManagerImpl(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax, List<MigrationSet> sets)
-    {
-        this.client = client;
-        this.lockPath = lockPath;
-        this.metaDataSerializer = metaDataSerializer;
-        this.executor = executor;
-        this.lockMax = lockMax;
-        this.sets = ImmutableList.copyOf(sets);
-    }
-
-    @Override
-    public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
-    {
-        ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
-        return ZNode.models(modeled.childrenAsZNodes());
-    }
-
-    @Override
-    public CompletionStage<Void> run()
-    {
-        Map<String, CompletableFuture<Void>> futures = sets
-            .stream()
-            .map(m -> new AbstractMap.SimpleEntry<>(m.id(), runMigration(m).toCompletableFuture()))
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[futures.size()]));
-    }
-
-    private CompletionStage<Void> runMigration(MigrationSet set)
-    {
-        String lockPath = this.lockPath.child(set.id()).fullPath();
-        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
-        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
-        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
-    }
-
-    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
-    {
-        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
-        return modeled.childrenAsZNodes()
-            .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
-            .handle((v, e) -> {
-                release(lock, true);
-                if ( e != null )
-                {
-                    Throwables.propagate(e);
-                }
-                return v;
-            }
-        );
-    }
-
-    private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
-    {
-        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
-        return ModeledFramework.wrap(client, modelSpec);
-    }
-
-    protected void checkIsValid(MigrationSet set, List<MetaData> sortedMetaData) throws InvalidMigrationSetException
-    {
-        if ( sortedMetaData.size() > set.migrations().size() )
-        {
-            throw new InvalidMigrationSetException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
-        }
-
-        int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
-        List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
-            .stream()
-            .map(m -> new MetaData(m.id(), m.version()))
-            .collect(Collectors.toList());
-        if ( !compareMigrations.equals(sortedMetaData) )
-        {
-            throw new InvalidMigrationSetException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
-        }
-    }
-
-    private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
-    {
-        List<MetaData> sortedMetaData = metaDataNodes
-            .stream()
-            .sorted(Comparator.comparing(m -> m.path().fullPath()))
-            .map(ZNode::model)
-            .collect(Collectors.toList());
-        try
-        {
-            checkIsValid(set, sortedMetaData);
-        }
-        catch ( InvalidMigrationSetException e )
-        {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.completeExceptionally(e);
-            return future;
-        }
-
-        List<Migration> toBeApplied = set.migrations().subList(sortedMetaData.size(), set.migrations().size());
-        if ( toBeApplied.size() == 0 )
-        {
-            return CompletableFuture.completedFuture(null);
-        }
-
-        return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
-            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, metaDataClient));
-    }
-
-    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
-    {
-        ModelSpec<byte[]> modelSpec = ModelSpec.builder(set.path(), ModelSerializer.raw).build();
-        ModeledFramework<byte[]> modeled = ModeledFramework.wrap(client, modelSpec);
-        return modeled.childrenAsZNodes().thenCompose(nodes -> {
-            List<CuratorOp> operations = new ArrayList<>();
-            for ( ZNode<byte[]> node : nodes )
-            {
-                byte[] currentBytes = node.model();
-                for ( Migration migration : toBeApplied )
-                {
-                    currentBytes = migration.migrate(currentBytes);
-                    MetaData thisMetaData = new MetaData(migration.id(), migration.version());
-                    operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
-                }
-                operations.add(modeled.child(node.path().nodeName()).updateOp(currentBytes, node.stat().getVersion()));
-            }
-            return client.transaction().forOperations(operations).thenApply(__ -> null);
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
index 9e41989..0a0dbe0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
@@ -27,16 +27,13 @@ public interface MigrationSet
 {
     String id();
 
-    ZPath path();
-
     ZPath metaDataPath();
 
     List<Migration> migrations();
 
-    static MigrationSet build(String id, ZPath path, ZPath metaDataPath, List<Migration> migrations)
+    static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)
     {
         Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(path, "path cannot be null");
         Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
         final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
         return new MigrationSet()
@@ -48,12 +45,6 @@ public interface MigrationSet
             }
 
             @Override
-            public ZPath path()
-            {
-                return path;
-            }
-
-            @Override
             public ZPath metaDataPath()
             {
                 return metaDataPath;

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index d709abe..daf69cd 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.migrations;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -37,7 +38,11 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.UnaryOperator;
 
 public class TestMigrationManager extends CompletableBaseClassForTests
@@ -47,6 +52,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     private ModelSpec<ModelV1> v1Spec;
     private ModelSpec<ModelV2> v2Spec;
     private ModelSpec<ModelV3> v3Spec;
+    private ExecutorService executor;
 
     @BeforeMethod
     @Override
@@ -54,10 +60,10 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     {
         super.setup();
 
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
-        client.start();
+        CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        rawClient.start();
 
-        this.client = AsyncCuratorFramework.wrap(client);
+        this.client = AsyncCuratorFramework.wrap(rawClient);
 
         ObjectMapper mapper = new ObjectMapper();
         UnaryOperator<byte[]> from1to2 = bytes -> {
@@ -89,13 +95,20 @@ public class TestMigrationManager extends CompletableBaseClassForTests
 
         ZPath modelPath = ZPath.parse("/test/it");
 
-        Migration m1 = Migration.build("1",1, from1to2);
-        Migration m2 = Migration.build("2",1, from2to3);
-        migrationSet = MigrationSet.build("1", modelPath, ZPath.parse("/metadata"), Arrays.asList(m1, m2));
-
         v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
         v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
         v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+
+        CuratorOp v1op = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+        CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+        CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
+
+        Migration m1 = Migration.build("1",1, () -> Collections.singletonList(v1op));
+        Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+        Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
+
+        executor = Executors.newCachedThreadPool();
     }
 
     @AfterMethod
@@ -103,6 +116,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     public void teardown() throws Exception
     {
         CloseableUtils.closeQuietly(client.unwrap());
+        executor.shutdownNow();
         super.teardown();
     }
 
@@ -113,11 +127,8 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         ModelV1 v1 = new ModelV1("John Galt");
         complete(v1Client.child("1").set(v1));
 
-        MigrationManager manager = MigrationManager.builder(this.client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class))
-            .adding(migrationSet)
-            .build();
-
-        complete(manager.run());
+        MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+        complete(manager.migrate(migrationSet));
 
         ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
         complete(v3Client.child("1").read(), (m, e) -> {