You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/18 13:16:33 UTC

[11/23] curator git commit: major refactoring and simplification. No longer dependent on any modeled code so it's moved to the parent async package

major refactoring and simplification. No longer dependent on any modeled code so it's moved to the parent async package


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d80651a7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d80651a7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d80651a7

Branch: refs/heads/master
Commit: d80651a7a276b0ce999601c30e205aceaae94d4c
Parents: 2ab172a
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:32:37 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:32:37 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/x/async/AsyncWrappers.java   |  96 +++++++-
 .../curator/x/async/migrations/Migration.java   |  37 ++++
 .../x/async/migrations/MigrationException.java  |  37 ++++
 .../x/async/migrations/MigrationManager.java    | 195 ++++++++++++++++
 .../x/async/migrations/MigrationSet.java        |  72 ++++++
 .../x/async/modeled/migrations/MetaData.java    |  25 ---
 .../x/async/modeled/migrations/Migration.java   |  37 ----
 .../modeled/migrations/MigrationException.java  |  37 ----
 .../modeled/migrations/MigrationManager.java    | 220 -------------------
 .../async/modeled/migrations/MigrationSet.java  |  73 ------
 .../async/migrations/TestMigrationManager.java  | 173 +++++++++++++++
 .../x/async/migrations/models/ModelV1.java      |  39 ++++
 .../x/async/migrations/models/ModelV2.java      |  46 ++++
 .../x/async/migrations/models/ModelV3.java      |  53 +++++
 .../migrations/TestMigrationManager.java        | 173 ---------------
 .../modeled/migrations/models/ModelV1.java      |  39 ----
 .../modeled/migrations/models/ModelV2.java      |  46 ----
 .../modeled/migrations/models/ModelV3.java      |  53 -----
 18 files changed, 747 insertions(+), 704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..d7b3cc3 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,12 +18,16 @@
  */
 package org.apache.curator.x.async;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -70,6 +74,66 @@ import java.util.concurrent.TimeUnit;
 public class AsyncWrappers
 {
     /**
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+     * </p>
+     *
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path)
+    {
+        return childrenWithData(client, path, false);
+    }
+
+    /**
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+     * </p>
+     *
+     * @param isCompressed pass true if data is compressed
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed)
+    {
+        CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>();
+        client.getChildren().forPath(path).handle((children, e) -> {
+            if ( e != null )
+            {
+                if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException )
+                {
+                    future.complete(Maps.newHashMap());
+                }
+                else
+                {
+                    future.completeExceptionally(e);
+                }
+            }
+            else
+            {
+                completeChildren(client, future, path, children, isCompressed);
+            }
+            return null;
+        });
+        return future;
+    }
+
+    /**
      * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
      * the given executor
      *
@@ -279,6 +343,36 @@ public class AsyncWrappers
         }
     }
 
+    private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed)
+    {
+        Map<String, byte[]> nodes = Maps.newHashMap();
+        if ( children.size() == 0 )
+        {
+            future.complete(nodes);
+            return;
+        }
+
+        children.forEach(node -> {
+            String path = ZKPaths.makePath(parentPath, node);
+            AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+            stage.handle((data, e) -> {
+                if ( e != null )
+                {
+                    future.completeExceptionally(e);
+                }
+                else
+                {
+                    nodes.put(path, data);
+                    if ( nodes.size() == children.size() )
+                    {
+                        future.complete(nodes);
+                    }
+                }
+                return null;
+            });
+        });
+    }
+
     private AsyncWrappers()
     {
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
new file mode 100644
index 0000000..daac435
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import java.util.List;
+
+/**
+ * Models a single migration/transition
+ */
+@FunctionalInterface
+public interface Migration
+{
+    /**
+     * Return the operations to execute in a transaction. IMPORTANT: during a migration
+     * this method may be called multiple times.
+     *
+     * @return operations
+     */
+    List<CuratorOp> operations();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
new file mode 100644
index 0000000..c4971ce
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import java.util.Objects;
+
+public class MigrationException extends RuntimeException
+{
+    private final String migrationId;
+
+    public MigrationException(String migrationId, String message)
+    {
+        super(message);
+        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+    }
+
+    public String getMigrationId()
+    {
+        return migrationId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
new file mode 100644
index 0000000..cb3d6ff
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.google.common.base.Throwables;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.imps.ExtractingCuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+/**
+ * Manages migrations
+ */
+public class MigrationManager
+{
+    private final AsyncCuratorFramework client;
+    private final String lockPath;
+    private final Executor executor;
+    private final Duration lockMax;
+
+    private static final String META_DATA_NODE_NAME = "meta-";
+
+    /**
+     * @param client the curator client
+     * @param lockPath base path for locks used by the manager
+     * @param executor the executor to use
+     * @param lockMax max time to wait for locks
+     */
+    public MigrationManager(AsyncCuratorFramework client, String lockPath, Executor executor, Duration lockMax)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+    }
+
+    /**
+     * Process the given migration set
+     *
+     * @param set the set
+     * @return completion stage. If there is a migration-specific error, the stage will be completed
+     * exceptionally with {@link org.apache.curator.x.async.migrations.MigrationException}.
+     */
+    public CompletionStage<Void> migrate(MigrationSet set)
+    {
+        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), ZKPaths.makePath(lockPath, set.id()));
+        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+    }
+
+    /**
+     * Can be overridden to change how the comparison to previous migrations is done. The default
+     * version ensures that the meta data from previous migrations matches the current migration
+     * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
+     *
+     * @param set the migration set being applied
+     * @param operationHashesInOrder previous operation hashes (may be empty)
+     * @return the list of actual migrations to perform. The filter can return any value here or an empty list.
+     * @throws MigrationException errors
+     */
+    protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException
+    {
+        if ( operationHashesInOrder.size() > set.migrations().size() )
+        {
+            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
+        }
+
+        int compareSize = Math.min(set.migrations().size(), operationHashesInOrder.size());
+        List<Migration> subList = set.migrations().subList(0, compareSize);
+        for ( int i = 0; i < compareSize; ++i )
+        {
+            byte[] setHash = hash(set.migrations().get(i).operations());
+            if ( !Arrays.equals(setHash, operationHashesInOrder.get(i)) )
+            {
+                throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
+            }
+        }
+        return set.migrations().subList(operationHashesInOrder.size(), set.migrations().size());
+    }
+
+    private byte[] hash(List<CuratorOp> operations)
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("SHA-256");
+        }
+        catch ( NoSuchAlgorithmException e )
+        {
+            throw new RuntimeException(e);
+        }
+        operations.forEach(op -> {
+            if ( op instanceof ExtractingCuratorOp )
+            {
+                ((ExtractingCuratorOp)op).addToDigest(digest);
+            }
+            else
+            {
+                digest.update(op.toString().getBytes());
+            }
+        });
+        return digest.digest();
+    }
+
+    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+    {
+        return childrenWithData(client, set.metaDataPath())
+            .thenCompose(metaData -> applyMetaData(set, metaData))
+            .handle((v, e) -> {
+                release(lock, true);
+                if ( e != null )
+                {
+                    Throwables.propagate(e);
+                }
+                return v;
+            }
+        );
+    }
+
+    private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData)
+    {
+        List<byte[]> sortedMetaData = metaData.keySet()
+            .stream()
+            .sorted(Comparator.naturalOrder())
+            .map(metaData::get)
+            .collect(Collectors.toList());
+
+        List<Migration> toBeApplied;
+        try
+        {
+            toBeApplied = filter(set, sortedMetaData);
+        }
+        catch ( MigrationException e )
+        {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+
+        if ( toBeApplied.size() == 0 )
+        {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return asyncEnsureContainers(client, set.metaDataPath())
+            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied));
+    }
+
+    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied)
+    {
+        String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), META_DATA_NODE_NAME);
+        List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
+            List<CuratorOp> operations = new ArrayList<>();
+            operations.addAll(migration.operations());
+            operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(operations)));
+            return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
+        }).collect(Collectors.toList());
+        return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
new file mode 100644
index 0000000..089d3d8
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Models a set of migrations. Each individual migration is applied
+ * in a transaction.
+ */
+public interface MigrationSet
+{
+    /**
+     * @return the unique ID for this migration set
+     */
+    String id();
+
+    /**
+     * @return where to store the meta data for this migration set
+     */
+    String metaDataPath();
+
+    /**
+     * @return list of migrations in the order that they should be applied
+     */
+    List<Migration> migrations();
+
+    static MigrationSet build(String id, String metaDataPath, List<Migration> migrations)
+    {
+        Objects.requireNonNull(id, "id cannot be null");
+        Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
+        final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
+        return new MigrationSet()
+        {
+            @Override
+            public String id()
+            {
+                return id;
+            }
+
+            @Override
+            public String metaDataPath()
+            {
+                return metaDataPath;
+            }
+
+            @Override
+            public List<Migration> migrations()
+            {
+                return migrationsCopy;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
deleted file mode 100644
index 8377967..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-@FunctionalInterface
-public interface MetaData
-{
-    byte[] operationHash();
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
deleted file mode 100644
index 972c59e..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import java.util.List;
-
-/**
- * Models a single migration/transition
- */
-@FunctionalInterface
-public interface Migration
-{
-    /**
-     * Return the operations to execute in a transaction. IMPORTANT: during a migration
-     * this method may be called multiple times.
-     *
-     * @return operations
-     */
-    List<CuratorOp> operations();
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
deleted file mode 100644
index 1a1d59c..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import java.util.Objects;
-
-public class MigrationException extends RuntimeException
-{
-    private final String migrationId;
-
-    public MigrationException(String migrationId, String message)
-    {
-        super(message);
-        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
-    }
-
-    public String getMigrationId()
-    {
-        return migrationId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
deleted file mode 100644
index d6d37de..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.base.Throwables;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.imps.ExtractingCuratorOp;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZNode;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.CreateMode;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.curator.x.async.AsyncWrappers.*;
-
-/**
- * Manages migrations
- */
-public class MigrationManager
-{
-    private final AsyncCuratorFramework client;
-    private final ZPath lockPath;
-    private final Executor executor;
-    private final Duration lockMax;
-
-    private static final String META_DATA_NODE_NAME = "meta-";
-
-    /**
-     * @param client the curator client
-     * @param lockPath base path for locks used by the manager
-     * @param executor the executor to use
-     * @param lockMax max time to wait for locks
-     */
-    public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, Executor executor, Duration lockMax)
-    {
-        this.client = Objects.requireNonNull(client, "client cannot be null");
-        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
-        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
-        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
-    }
-
-    /**
-     * Process the given migration set
-     *
-     * @param set the set
-     * @return completion stage. If there is a migration-specific error, the stage will be completed
-     * exceptionally with {@link org.apache.curator.x.async.modeled.migrations.MigrationException}.
-     */
-    public CompletionStage<Void> migrate(MigrationSet set)
-    {
-        String lockPath = this.lockPath.child(set.id()).fullPath();
-        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
-        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
-        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
-    }
-
-    /**
-     * Can be overridden to change how the comparison to previous migrations is done. The default
-     * version ensures that the meta data from previous migrations matches the current migration
-     * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
-     *
-     * @param set the migration set being applied
-     * @param sortedMetaData previous migration meta data (may be empty)
-     * @return the list of actual migrations to perform. The filter can return any value here or an empty list.
-     * @throws MigrationException errors
-     */
-    protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
-    {
-        if ( sortedMetaData.size() > set.migrations().size() )
-        {
-            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
-        }
-
-        int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
-        List<Migration> subList = set.migrations().subList(0, compareSize);
-        for ( int i = 0; i < compareSize; ++i )
-        {
-            byte[] setHash = hash(set.migrations().get(i).operations()).operationHash();
-            if ( !Arrays.equals(setHash, sortedMetaData.get(i).operationHash()) )
-            {
-                throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
-            }
-        }
-        return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
-    }
-
-    private MetaData hash(List<CuratorOp> operations)
-    {
-        MessageDigest digest;
-        try
-        {
-            digest = MessageDigest.getInstance("SHA-256");
-        }
-        catch ( NoSuchAlgorithmException e )
-        {
-            throw new RuntimeException(e);
-        }
-        operations.forEach(op -> {
-            if ( op instanceof ExtractingCuratorOp )
-            {
-                ((ExtractingCuratorOp)op).addToDigest(digest);
-            }
-            else
-            {
-                digest.update(op.toString().getBytes());
-            }
-        });
-        return digest::digest;
-    }
-
-    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
-    {
-        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
-        return modeled.childrenAsZNodes()
-            .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
-            .handle((v, e) -> {
-                release(lock, true);
-                if ( e != null )
-                {
-                    Throwables.propagate(e);
-                }
-                return v;
-            }
-        );
-    }
-
-    private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
-    {
-        ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>()
-        {
-            @Override
-            public byte[] serialize(MetaData model)
-            {
-                return model.operationHash();
-            }
-
-            @Override
-            public MetaData deserialize(byte[] bytes)
-            {
-                return () -> bytes;
-            }
-        };
-        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
-        return ModeledFramework.wrap(client, modelSpec);
-    }
-
-    private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
-    {
-        List<MetaData> sortedMetaData = metaDataNodes
-            .stream()
-            .sorted(Comparator.comparing(m -> m.path().fullPath()))
-            .map(ZNode::model)
-            .collect(Collectors.toList());
-
-        List<Migration> toBeApplied;
-        try
-        {
-            toBeApplied = filter(set, sortedMetaData);
-        }
-        catch ( MigrationException e )
-        {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.completeExceptionally(e);
-            return future;
-        }
-
-        if ( toBeApplied.size() == 0 )
-        {
-            return CompletableFuture.completedFuture(null);
-        }
-
-        return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
-            .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient));
-    }
-
-    private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
-    {
-        List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
-            List<CuratorOp> operations = new ArrayList<>();
-            operations.addAll(migration.operations());
-            MetaData thisMetaData = hash(operations);
-            operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
-            return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
-        }).collect(Collectors.toList());
-        return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
deleted file mode 100644
index c4cd90e..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.curator.x.async.modeled.ZPath;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Models a set of migrations. Each individual migration is applied
- * in a transaction.
- */
-public interface MigrationSet
-{
-    /**
-     * @return the unique ID for this migration set
-     */
-    String id();
-
-    /**
-     * @return where to store the meta data for this migration set
-     */
-    ZPath metaDataPath();
-
-    /**
-     * @return list of migrations in the order that they should be applied
-     */
-    List<Migration> migrations();
-
-    static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)
-    {
-        Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
-        final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
-        return new MigrationSet()
-        {
-            @Override
-            public String id()
-            {
-                return id;
-            }
-
-            @Override
-            public ZPath metaDataPath()
-            {
-                return metaDataPath;
-            }
-
-            @Override
-            public List<Migration> migrations()
-            {
-                return migrationsCopy;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
new file mode 100644
index 0000000..42bc76d
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.modeled.JacksonModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.migrations.models.ModelV1;
+import org.apache.curator.x.async.migrations.models.ModelV2;
+import org.apache.curator.x.async.migrations.models.ModelV3;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.UnaryOperator;
+
+public class TestMigrationManager extends CompletableBaseClassForTests
+{
+    private AsyncCuratorFramework client;
+    private ModelSpec<ModelV1> v1Spec;
+    private ModelSpec<ModelV2> v2Spec;
+    private ModelSpec<ModelV3> v3Spec;
+    private ExecutorService executor;
+    private CuratorOp v1opA;
+    private CuratorOp v1opB;
+    private CuratorOp v2op;
+    private CuratorOp v3op;
+    private MigrationManager manager;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        rawClient.start();
+
+        this.client = AsyncCuratorFramework.wrap(rawClient);
+
+        ObjectMapper mapper = new ObjectMapper();
+        UnaryOperator<byte[]> from1to2 = bytes -> {
+            try
+            {
+                ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
+                ModelV2 v2 = new ModelV2(v1.getName(), 64);
+                return mapper.writeValueAsBytes(v2);
+            }
+            catch ( IOException e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        UnaryOperator<byte[]> from2to3 = bytes -> {
+            try
+            {
+                ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
+                String[] nameParts = v2.getName().split("\\s");
+                ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
+                return mapper.writeValueAsBytes(v3);
+            }
+            catch ( IOException e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        ZPath modelPath = ZPath.parse("/test/it");
+
+        v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
+        v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
+        v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+
+        v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
+        v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+        v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+        v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
+
+        executor = Executors.newCachedThreadPool();
+        manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        CloseableUtils.closeQuietly(client.unwrap());
+        executor.shutdownNow();
+        super.teardown();
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+        Migration m2 = () -> Collections.singletonList(v2op);
+        Migration m3 = () -> Collections.singletonList(v3op);
+        MigrationSet migrationSet = MigrationSet.build("1", "/metadata", Arrays.asList(m1, m2, m3));
+
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+        complete(v3Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 30);
+            Assert.assertEquals(m.getFirstName(), "One");
+            Assert.assertEquals(m.getLastName(), "Two");
+        });
+    }
+
+    @Test
+    public void testStaged() throws Exception
+    {
+        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+        MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", Collections.singletonList(m1));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
+        complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
+
+        Migration m2 = () -> Collections.singletonList(v2op);
+        migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
+        complete(v2Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getName(), "Test 2");
+            Assert.assertEquals(m.getAge(), 10);
+        });
+
+        Migration m3 = () -> Collections.singletonList(v3op);
+        migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2, m3));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+        complete(v3Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 30);
+            Assert.assertEquals(m.getFirstName(), "One");
+            Assert.assertEquals(m.getLastName(), "Two");
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
new file mode 100644
index 0000000..46fc5ff
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV1
+{
+    private final String name;
+
+    public ModelV1()
+    {
+        this("");
+    }
+
+    public ModelV1(String name)
+    {
+        this.name = name;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
new file mode 100644
index 0000000..31e05bd
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV2
+{
+    private final String name;
+    private final int age;
+
+    public ModelV2()
+    {
+        this("", 0);
+    }
+
+    public ModelV2(String name, int age)
+    {
+        this.name = name;
+        this.age = age;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
new file mode 100644
index 0000000..519923c
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV3
+{
+    private final String firstName;
+    private final String lastName;
+    private final int age;
+
+    public ModelV3()
+    {
+        this("", "", 0);
+    }
+
+    public ModelV3(String firstName, String lastName, int age)
+    {
+        this.firstName = firstName;
+        this.lastName = lastName;
+        this.age = age;
+    }
+
+    public String getFirstName()
+    {
+        return firstName;
+    }
+
+    public String getLastName()
+    {
+        return lastName;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
deleted file mode 100644
index 45aa130..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.CompletableBaseClassForTests;
-import org.apache.curator.x.async.modeled.JacksonModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV1;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV2;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV3;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.UnaryOperator;
-
-public class TestMigrationManager extends CompletableBaseClassForTests
-{
-    private AsyncCuratorFramework client;
-    private ModelSpec<ModelV1> v1Spec;
-    private ModelSpec<ModelV2> v2Spec;
-    private ModelSpec<ModelV3> v3Spec;
-    private ExecutorService executor;
-    private CuratorOp v1opA;
-    private CuratorOp v1opB;
-    private CuratorOp v2op;
-    private CuratorOp v3op;
-    private MigrationManager manager;
-
-    @BeforeMethod
-    @Override
-    public void setup() throws Exception
-    {
-        super.setup();
-
-        CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
-        rawClient.start();
-
-        this.client = AsyncCuratorFramework.wrap(rawClient);
-
-        ObjectMapper mapper = new ObjectMapper();
-        UnaryOperator<byte[]> from1to2 = bytes -> {
-            try
-            {
-                ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
-                ModelV2 v2 = new ModelV2(v1.getName(), 64);
-                return mapper.writeValueAsBytes(v2);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
-        UnaryOperator<byte[]> from2to3 = bytes -> {
-            try
-            {
-                ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
-                String[] nameParts = v2.getName().split("\\s");
-                ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
-                return mapper.writeValueAsBytes(v3);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
-        ZPath modelPath = ZPath.parse("/test/it");
-
-        v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
-        v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
-        v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
-
-        v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
-        v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
-        v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
-        v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
-
-        executor = Executors.newCachedThreadPool();
-        manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
-    }
-
-    @AfterMethod
-    @Override
-    public void teardown() throws Exception
-    {
-        CloseableUtils.closeQuietly(client.unwrap());
-        executor.shutdownNow();
-        super.teardown();
-    }
-
-    @Test
-    public void testBasic() throws Exception
-    {
-        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
-        Migration m2 = () -> Collections.singletonList(v2op);
-        Migration m3 = () -> Collections.singletonList(v3op);
-        MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
-
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
-        complete(v3Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getAge(), 30);
-            Assert.assertEquals(m.getFirstName(), "One");
-            Assert.assertEquals(m.getLastName(), "Two");
-        });
-    }
-
-    @Test
-    public void testStaged() throws Exception
-    {
-        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
-        MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
-        complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
-
-        Migration m2 = () -> Collections.singletonList(v2op);
-        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
-        complete(v2Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getName(), "Test 2");
-            Assert.assertEquals(m.getAge(), 10);
-        });
-
-        Migration m3 = () -> Collections.singletonList(v3op);
-        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
-        complete(v3Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getAge(), 30);
-            Assert.assertEquals(m.getFirstName(), "One");
-            Assert.assertEquals(m.getLastName(), "Two");
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
deleted file mode 100644
index 02b13b7..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV1
-{
-    private final String name;
-
-    public ModelV1()
-    {
-        this("");
-    }
-
-    public ModelV1(String name)
-    {
-        this.name = name;
-    }
-
-    public String getName()
-    {
-        return name;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
deleted file mode 100644
index bd77a2e..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV2
-{
-    private final String name;
-    private final int age;
-
-    public ModelV2()
-    {
-        this("", 0);
-    }
-
-    public ModelV2(String name, int age)
-    {
-        this.name = name;
-        this.age = age;
-    }
-
-    public String getName()
-    {
-        return name;
-    }
-
-    public int getAge()
-    {
-        return age;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
deleted file mode 100644
index d4713b8..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV3
-{
-    private final String firstName;
-    private final String lastName;
-    private final int age;
-
-    public ModelV3()
-    {
-        this("", "", 0);
-    }
-
-    public ModelV3(String firstName, String lastName, int age)
-    {
-        this.firstName = firstName;
-        this.lastName = lastName;
-        this.age = age;
-    }
-
-    public String getFirstName()
-    {
-        return firstName;
-    }
-
-    public String getLastName()
-    {
-        return lastName;
-    }
-
-    public int getAge()
-    {
-        return age;
-    }
-}