You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/18 13:16:23 UTC

[01/23] curator git commit: initial work on migrations done. Needs lots of testing and doc. However, found bug CURATOR-423 and will need to fix that first

Repository: curator
Updated Branches:
  refs/heads/master 7d4f06238 -> 7a60af0dd


initial work on migrations done. Needs lots of testing and doc. However, found bug CURATOR-423 and will need to fix that first


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4cd731c9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4cd731c9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4cd731c9

Branch: refs/heads/master
Commit: 4cd731c92d34a075d9d5a9610ec2610d69aa7792
Parents: 716fb4a
Author: randgalt <ra...@apache.org>
Authored: Thu Jul 13 23:44:59 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jul 13 23:44:59 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/x/async/AsyncWrappers.java   |  38 ++--
 .../x/async/modeled/ModelSerializer.java        |  18 ++
 .../async/modeled/ModeledFrameworkBuilder.java  |  20 +-
 .../curator/x/async/modeled/ModeledOptions.java |  29 +++
 .../modeled/details/ModeledFrameworkImpl.java   |  33 +++-
 .../InvalidMigrationSetException.java           |  37 ++++
 .../x/async/modeled/migrations/MetaData.java    |  84 +++++++++
 .../x/async/modeled/migrations/Migration.java   |  57 ++++++
 .../modeled/migrations/MigrationManager.java    |  37 ++++
 .../migrations/MigrationManagerBuilder.java     |  68 +++++++
 .../migrations/MigrationManagerImpl.java        | 181 +++++++++++++++++++
 .../async/modeled/migrations/MigrationSet.java  |  69 +++++++
 .../x/async/modeled/migrations/Result.java      |  23 +++
 .../x/async/CompletableBaseClassForTests.java   |   8 +-
 .../migrations/TestMigrationManager.java        | 129 +++++++++++++
 .../modeled/migrations/models/ModelV1.java      |  39 ++++
 .../modeled/migrations/models/ModelV2.java      |  46 +++++
 .../modeled/migrations/models/ModelV3.java      |  53 ++++++
 18 files changed, 934 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index e982cf2..7da82fc 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -20,7 +20,10 @@ package org.apache.curator.x.async;
 
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.api.ExistsOption;
 import org.apache.curator.x.async.modeled.ZPath;
+import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
@@ -67,38 +70,21 @@ public class AsyncWrappers
 {
     /**
      * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
-     * the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
-     *
-     * @param client client
-     * @param path path to ensure
-     * @return stage
-     */
-    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
-    {
-        return asyncEnsureContainers(client, path, null);
-    }
-
-    /**
-     * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
      * the given executor
      *
      * @param client client
      * @param path path to ensure
      * @return stage
      */
-    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path, Executor executor)
+    public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
     {
-        Runnable proc = () -> {
-            try
-            {
-                client.unwrap().createContainers(path.fullPath());
-            }
-            catch ( Exception e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-        return (executor != null) ? CompletableFuture.runAsync(proc, executor) : CompletableFuture.runAsync(proc);
+        Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
+        return client
+            .checkExists()
+            .withOptions(options)
+            .forPath(path.child("foo").fullPath())
+            .thenApply(__ -> null)
+            ;
     }
 
     /**
@@ -284,7 +270,7 @@ public class AsyncWrappers
                 future.complete(null);
             }
         }
-        catch ( Exception e )
+        catch ( Throwable e )
         {
             ThreadUtils.checkInterrupted(e);
             future.completeExceptionally(e);

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
index 428096e..476f314 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
@@ -40,4 +40,22 @@ public interface ModelSerializer<T>
      * @throws RuntimeException if <code>bytes</code> is invalid or there was an error deserializing
      */
     T deserialize(byte[] bytes);
+
+    /**
+     * A pass through serializer
+     */
+    ModelSerializer<byte[]> raw = new ModelSerializer<byte[]>()
+    {
+        @Override
+        public byte[] serialize(byte[] model)
+        {
+            return model;
+        }
+
+        @Override
+        public byte[] deserialize(byte[] bytes)
+        {
+            return bytes;
+        }
+    };
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
index 2e8bec3..1df68e6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
@@ -18,13 +18,16 @@
  */
 package org.apache.curator.x.async.modeled;
 
+import com.google.common.collect.ImmutableSet;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.WatchMode;
 import org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl;
 import org.apache.zookeeper.WatchedEvent;
+import java.util.Collections;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.UnaryOperator;
 
 public class ModeledFrameworkBuilder<T>
@@ -35,6 +38,7 @@ public class ModeledFrameworkBuilder<T>
     private UnaryOperator<WatchedEvent> watcherFilter;
     private UnhandledErrorListener unhandledErrorListener;
     private UnaryOperator<CuratorEvent> resultFilter;
+    private Set<ModeledOptions> modeledOptions;
 
     /**
      * Build a new ModeledFramework instance
@@ -49,7 +53,8 @@ public class ModeledFrameworkBuilder<T>
             watchMode,
             watcherFilter,
             unhandledErrorListener,
-            resultFilter
+            resultFilter,
+            modeledOptions
         );
     }
 
@@ -142,6 +147,18 @@ public class ModeledFrameworkBuilder<T>
         return this;
     }
 
+    /**
+     * Change the modeled options
+     *
+     * @param modeledOptions new options set
+     * @return this for chaining
+     */
+    public ModeledFrameworkBuilder<T> withOptions(Set<ModeledOptions> modeledOptions)
+    {
+        this.modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "client cannot be null"));
+        return this;
+    }
+
     ModeledFrameworkBuilder()
     {
     }
@@ -150,5 +167,6 @@ public class ModeledFrameworkBuilder<T>
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");
         this.modelSpec = Objects.requireNonNull(modelSpec, "modelSpec cannot be null");
+        modeledOptions = Collections.singleton(ModeledOptions.ignoreMissingNodesForChildren);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
new file mode 100644
index 0000000..434894b
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+public enum ModeledOptions
+{
+    /**
+     * Causes {@link ModeledFramework#children()} and {@link ModeledFramework#childrenAsZNodes()}
+     * to ignore {@link org.apache.zookeeper.KeeperException.NoNodeException} and merely return
+     * an empty list
+     */
+    ignoreMissingNodesForChildren
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index c1d19c4..44011ee 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -19,6 +19,8 @@
 package org.apache.curator.x.async.modeled.details;
 
 import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -36,13 +38,16 @@ import org.apache.curator.x.async.api.CreateOption;
 import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
 import org.apache.curator.x.async.modeled.ModelSpec;
 import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ModeledOptions;
 import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
 import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
+import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
@@ -62,13 +67,15 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
     private final UnaryOperator<CuratorEvent> resultFilter;
     private final AsyncCuratorFrameworkDsl dslClient;
     private final boolean isWatched;
+    private final Set<ModeledOptions> modeledOptions;
 
-    public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter)
+    public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, Set<ModeledOptions> modeledOptions)
     {
         boolean isWatched = (watchMode != null);
 
         Objects.requireNonNull(client, "client cannot be null");
         Objects.requireNonNull(model, "model cannot be null");
+        modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "modeledOptions cannot be null"));
 
         watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess;
 
@@ -84,11 +91,12 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
             watcherFilter,
             unhandledErrorListener,
             resultFilter,
-            isWatched
+            isWatched,
+            modeledOptions
         );
     }
 
-    private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched)
+    private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched, Set<ModeledOptions> modeledOptions)
     {
         this.client = client;
         this.dslClient = dslClient;
@@ -99,6 +107,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
         this.unhandledErrorListener = unhandledErrorListener;
         this.resultFilter = resultFilter;
         this.isWatched = isWatched;
+        this.modeledOptions = modeledOptions;
     }
 
     @Override
@@ -280,7 +289,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
         asyncStage.whenComplete((children, e) -> {
             if ( e != null )
             {
-                modelStage.completeExceptionally(e);
+                if ( modeledOptions.contains(ModeledOptions.ignoreMissingNodesForChildren) && (Throwables.getRootCause(e) instanceof KeeperException.NoNodeException) )
+                {
+                    modelStage.complete(Collections.emptyList());
+                }
+                else
+                {
+                    modelStage.completeExceptionally(e);
+                }
             }
             else
             {
@@ -303,7 +319,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
             watcherFilter,
             unhandledErrorListener,
             resultFilter,
-            isWatched
+            isWatched,
+            modeledOptions
         );
     }
 
@@ -320,7 +337,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
             watcherFilter,
             unhandledErrorListener,
             resultFilter,
-            isWatched
+            isWatched,
+            modeledOptions
         );
     }
 
@@ -337,7 +355,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
             watcherFilter,
             unhandledErrorListener,
             resultFilter,
-            isWatched
+            isWatched,
+            modeledOptions
         );
     }
 

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
new file mode 100644
index 0000000..84b21bf
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+
+public class InvalidMigrationSetException extends RuntimeException
+{
+    private final String migrationId;
+
+    public InvalidMigrationSetException(String migrationId, String message)
+    {
+        super(message);
+        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+    }
+
+    public String getMigrationId()
+    {
+        return migrationId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
new file mode 100644
index 0000000..c2878e2
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+
+public class MetaData
+{
+    private final String migrationId;
+    private final int migrationVersion;
+
+    public MetaData()
+    {
+        this("", 0);
+    }
+
+    public MetaData(String migrationId, int migrationVersion)
+    {
+        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+        this.migrationVersion = migrationVersion;
+    }
+
+    public String getMigrationId()
+    {
+        return migrationId;
+    }
+
+    public int getMigrationVersion()
+    {
+        return migrationVersion;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if ( this == o )
+        {
+            return true;
+        }
+        if ( o == null || getClass() != o.getClass() )
+        {
+            return false;
+        }
+
+        MetaData metaData = (MetaData)o;
+
+        //noinspection SimplifiableIfStatement
+        if ( migrationVersion != metaData.migrationVersion )
+        {
+            return false;
+        }
+        return migrationId.equals(metaData.migrationId);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        int result = migrationId.hashCode();
+        result = 31 * result + migrationVersion;
+        return result;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "MetaData{" + "migrationId='" + migrationId + '\'' + ", migrationVersion=" + migrationVersion + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
new file mode 100644
index 0000000..b3919d1
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+
+public interface Migration
+{
+    String id();
+
+    int version();
+
+    byte[] migrate(byte[] previousBytes);
+
+    static Migration build(String id, int version, UnaryOperator<byte[]> migrateProc)
+    {
+        Objects.requireNonNull(id, "id cannot be null");
+        Objects.requireNonNull(migrateProc, "migrateProc cannot be null");
+        return new Migration()
+        {
+            @Override
+            public String id()
+            {
+                return id;
+            }
+
+            @Override
+            public int version()
+            {
+                return version;
+            }
+
+            @Override
+            public byte[] migrate(byte[] previousBytes)
+            {
+                return migrateProc.apply(previousBytes);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
new file mode 100644
index 0000000..2d5f39f
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+
+public interface MigrationManager
+{
+    CompletionStage<List<MetaData>> metaData(ZPath metaDataPath);
+
+    CompletionStage<Void> run();
+
+    static MigrationManagerBuilder builder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
+    {
+        return new MigrationManagerBuilder(client, lockPath, metaDataSerializer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
new file mode 100644
index 0000000..ed48242
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+public class MigrationManagerBuilder
+{
+    private final AsyncCuratorFramework client;
+    private final ZPath lockPath;
+    private final ModelSerializer<MetaData> metaDataSerializer;
+    private final List<MigrationSet> sets = new ArrayList<>();
+    private Executor executor = Runnable::run;
+    private Duration lockMax = Duration.ofSeconds(15);
+
+    MigrationManagerBuilder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+        this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
+    }
+
+    public MigrationManager build()
+    {
+        return new MigrationManagerImpl(client, lockPath, metaDataSerializer, executor, lockMax, sets);
+    }
+
+    public MigrationManagerBuilder withExecutor(Executor executor)
+    {
+        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+        return this;
+    }
+
+    public MigrationManagerBuilder withLockMax(Duration lockMax)
+    {
+        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+        return this;
+    }
+
+    public MigrationManagerBuilder adding(MigrationSet set)
+    {
+        sets.add(Objects.requireNonNull(set, "set cannot be null"));
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
new file mode 100644
index 0000000..15c61b2
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.CreateMode;
+import java.time.Duration;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+class MigrationManagerImpl implements MigrationManager
+{
+    private final AsyncCuratorFramework client;
+    private final ZPath lockPath;
+    private final ModelSerializer<MetaData> metaDataSerializer;
+    private final Executor executor;
+    private final Duration lockMax;
+    private final List<MigrationSet> sets;
+
+    private static final String META_DATA_NODE_NAME = "meta-";
+
+    MigrationManagerImpl(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax, List<MigrationSet> sets)
+    {
+        this.client = client;
+        this.lockPath = lockPath;
+        this.metaDataSerializer = metaDataSerializer;
+        this.executor = executor;
+        this.lockMax = lockMax;
+        this.sets = ImmutableList.copyOf(sets);
+    }
+
+    @Override
+    public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
+    {
+        ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
+        return ZNode.models(modeled.childrenAsZNodes());
+    }
+
+    @Override
+    public CompletionStage<Void> run()
+    {
+        Map<String, CompletableFuture<Void>> futures = sets
+            .stream()
+            .map(m -> new AbstractMap.SimpleEntry<>(m.id(), runMigration(m).toCompletableFuture()))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+        return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[futures.size()]));
+    }
+
+    private CompletionStage<Void> runMigration(MigrationSet set)
+    {
+        String lockPath = this.lockPath.child(set.id()).fullPath();
+        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
+        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+    }
+
+    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+    {
+        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
+        return modeled.childrenAsZNodes()
+            .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
+            .handle((v, e) -> {
+                release(lock, true);
+                if ( e != null )
+                {
+                    Throwables.propagate(e);
+                }
+                return v;
+            }
+        );
+    }
+
+    private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
+    {
+        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+        return ModeledFramework.wrap(client, modelSpec);
+    }
+
+    protected void checkIsValid(MigrationSet set, List<MetaData> sortedMetaData) throws InvalidMigrationSetException
+    {
+        if ( sortedMetaData.size() > set.migrations().size() )
+        {
+            throw new InvalidMigrationSetException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+        }
+
+        int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
+        List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
+            .stream()
+            .map(m -> new MetaData(m.id(), m.version()))
+            .collect(Collectors.toList());
+        if ( !compareMigrations.equals(sortedMetaData) )
+        {
+            throw new InvalidMigrationSetException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+        }
+    }
+
+    private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
+    {
+        List<MetaData> sortedMetaData = metaDataNodes
+            .stream()
+            .sorted(Comparator.comparing(m -> m.path().fullPath()))
+            .map(ZNode::model)
+            .collect(Collectors.toList());
+        try
+        {
+            checkIsValid(set, sortedMetaData);
+        }
+        catch ( InvalidMigrationSetException e )
+        {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+
+        List<Migration> toBeApplied = set.migrations().subList(sortedMetaData.size(), set.migrations().size());
+        if ( toBeApplied.size() == 0 )
+        {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
+            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, metaDataClient));
+    }
+
+    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
+    {
+        ModelSpec<byte[]> modelSpec = ModelSpec.builder(set.path(), ModelSerializer.raw).build();
+        ModeledFramework<byte[]> modeled = ModeledFramework.wrap(client, modelSpec);
+        return modeled.childrenAsZNodes().thenCompose(nodes -> {
+            List<CuratorOp> operations = new ArrayList<>();
+            for ( ZNode<byte[]> node : nodes )
+            {
+                byte[] currentBytes = node.model();
+                for ( Migration migration : toBeApplied )
+                {
+                    currentBytes = migration.migrate(currentBytes);
+                    MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+                    operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
+                }
+                operations.add(modeled.child(node.path().nodeName()).updateOp(currentBytes, node.stat().getVersion()));
+            }
+            return client.transaction().forOperations(operations).thenApply(__ -> null);
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
new file mode 100644
index 0000000..9e41989
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.util.List;
+import java.util.Objects;
+
+public interface MigrationSet
+{
+    String id();
+
+    ZPath path();
+
+    ZPath metaDataPath();
+
+    List<Migration> migrations();
+
+    static MigrationSet build(String id, ZPath path, ZPath metaDataPath, List<Migration> migrations)
+    {
+        Objects.requireNonNull(id, "id cannot be null");
+        Objects.requireNonNull(path, "path cannot be null");
+        Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
+        final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
+        return new MigrationSet()
+        {
+            @Override
+            public String id()
+            {
+                return id;
+            }
+
+            @Override
+            public ZPath path()
+            {
+                return path;
+            }
+
+            @Override
+            public ZPath metaDataPath()
+            {
+                return metaDataPath;
+            }
+
+            @Override
+            public List<Migration> migrations()
+            {
+                return migrationsCopy;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
new file mode 100644
index 0000000..4fcfeac
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+public interface Result
+{
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
index 232d301..4a964b1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async;
 
+import com.google.common.base.Throwables;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.Timing;
 import org.testng.Assert;
@@ -33,7 +34,12 @@ public abstract class CompletableBaseClassForTests extends BaseClassForTests
 
     protected <T, U> void complete(CompletionStage<T> stage)
     {
-        complete(stage, (v, e) -> {});
+        complete(stage, (v, e) -> {
+            if ( e != null )
+            {
+                Throwables.propagate(e);
+            }
+        });
     }
 
     protected <T, U> void complete(CompletionStage<T> stage, BiConsumer<? super T, Throwable> handler)

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
new file mode 100644
index 0000000..d709abe
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.modeled.JacksonModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.migrations.models.ModelV1;
+import org.apache.curator.x.async.modeled.migrations.models.ModelV2;
+import org.apache.curator.x.async.modeled.migrations.models.ModelV3;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.function.UnaryOperator;
+
+public class TestMigrationManager extends CompletableBaseClassForTests
+{
+    private AsyncCuratorFramework client;
+    private MigrationSet migrationSet;
+    private ModelSpec<ModelV1> v1Spec;
+    private ModelSpec<ModelV2> v2Spec;
+    private ModelSpec<ModelV3> v3Spec;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        client.start();
+
+        this.client = AsyncCuratorFramework.wrap(client);
+
+        ObjectMapper mapper = new ObjectMapper();
+        UnaryOperator<byte[]> from1to2 = bytes -> {
+            try
+            {
+                ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
+                ModelV2 v2 = new ModelV2(v1.getName(), 64);
+                return mapper.writeValueAsBytes(v2);
+            }
+            catch ( IOException e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        UnaryOperator<byte[]> from2to3 = bytes -> {
+            try
+            {
+                ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
+                String[] nameParts = v2.getName().split("\\s");
+                ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
+                return mapper.writeValueAsBytes(v3);
+            }
+            catch ( IOException e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        ZPath modelPath = ZPath.parse("/test/it");
+
+        Migration m1 = Migration.build("1",1, from1to2);
+        Migration m2 = Migration.build("2",1, from2to3);
+        migrationSet = MigrationSet.build("1", modelPath, ZPath.parse("/metadata"), Arrays.asList(m1, m2));
+
+        v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
+        v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
+        v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        CloseableUtils.closeQuietly(client.unwrap());
+        super.teardown();
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
+        ModelV1 v1 = new ModelV1("John Galt");
+        complete(v1Client.child("1").set(v1));
+
+        MigrationManager manager = MigrationManager.builder(this.client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class))
+            .adding(migrationSet)
+            .build();
+
+        complete(manager.run());
+
+        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+        complete(v3Client.child("1").read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 64);
+            Assert.assertEquals(m.getFirstName(), "John");
+            Assert.assertEquals(m.getLastName(), "Galt");
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
new file mode 100644
index 0000000..02b13b7
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations.models;
+
+public class ModelV1
+{
+    private final String name;
+
+    public ModelV1()
+    {
+        this("");
+    }
+
+    public ModelV1(String name)
+    {
+        this.name = name;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
new file mode 100644
index 0000000..bd77a2e
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations.models;
+
+public class ModelV2
+{
+    private final String name;
+    private final int age;
+
+    public ModelV2()
+    {
+        this("", 0);
+    }
+
+    public ModelV2(String name, int age)
+    {
+        this.name = name;
+        this.age = age;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
new file mode 100644
index 0000000..d4713b8
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations.models;
+
+public class ModelV3
+{
+    private final String firstName;
+    private final String lastName;
+    private final int age;
+
+    public ModelV3()
+    {
+        this("", "", 0);
+    }
+
+    public ModelV3(String firstName, String lastName, int age)
+    {
+        this.firstName = firstName;
+        this.lastName = lastName;
+        this.age = age;
+    }
+
+    public String getFirstName()
+    {
+        return firstName;
+    }
+
+    public String getLastName()
+    {
+        return lastName;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+}


[16/23] curator git commit: more tests

Posted by ra...@apache.org.
more tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/75118e43
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/75118e43
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/75118e43

Branch: refs/heads/master
Commit: 75118e43b165d2e99a432161cee6a3dab55e3e4e
Parents: f7e728b
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 17:15:32 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 17:15:32 2017 -0500

----------------------------------------------------------------------
 .../async/migrations/TestMigrationManager.java  | 50 ++++++++++++++++++--
 1 file changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/75118e43/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 3522911..80a03bb 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -33,6 +33,7 @@ import org.apache.curator.x.async.modeled.JacksonModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
 import org.apache.curator.x.async.modeled.ModeledFramework;
 import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -94,7 +95,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     }
 
     @Test
-    public void testBasic() throws Exception
+    public void testBasic()
     {
         Migration m1 = () -> Arrays.asList(v1opA, v1opB);
         Migration m2 = () -> Collections.singletonList(v2op);
@@ -116,7 +117,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     }
 
     @Test
-    public void testStaged() throws Exception
+    public void testStaged()
     {
         Migration m1 = () -> Arrays.asList(v1opA, v1opB);
         MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(m1));
@@ -174,7 +175,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     }
 
     @Test
-    public void testChecksumDataError() throws Exception
+    public void testChecksumDataError()
     {
         CuratorOp op1 = client.transactionOp().create().forPath("/test");
         CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
@@ -197,7 +198,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     }
 
     @Test
-    public void testChecksumPathError() throws Exception
+    public void testChecksumPathError()
     {
         CuratorOp op1 = client.transactionOp().create().forPath("/test2");
         CuratorOp op2 = client.transactionOp().create().forPath("/test2/bar");
@@ -218,4 +219,45 @@ public class TestMigrationManager extends CompletableBaseClassForTests
             Assert.assertTrue(Throwables.getRootCause(e) instanceof MigrationException);
         }
     }
+
+    @Test
+    public void testPartialApplyForBadOps() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/test", "something".getBytes());
+        CuratorOp op2 = client.transactionOp().create().forPath("/a/b/c");
+        Migration m1 = () -> Collections.singletonList(op1);
+        Migration m2 = () -> Collections.singletonList(op2);
+        MigrationSet migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2));
+        try
+        {
+            complete(manager.migrate(migrationSet));
+            Assert.fail("Should throw");
+        }
+        catch ( Throwable e )
+        {
+            Assert.assertTrue(Throwables.getRootCause(e) instanceof KeeperException.NoNodeException);
+        }
+
+        Assert.assertEquals(client.unwrap().getData().forPath("/test"), "something".getBytes());
+    }
+
+    @Test
+    public void testTransactionForBadOps() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/test2", "something".getBytes());
+        CuratorOp op2 = client.transactionOp().create().forPath("/a/b/c/d");
+        Migration migration = () -> Arrays.asList(op1, op2);
+        MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        try
+        {
+            complete(manager.migrate(migrationSet));
+            Assert.fail("Should throw");
+        }
+        catch ( Throwable e )
+        {
+            Assert.assertTrue(Throwables.getRootCause(e) instanceof KeeperException.NoNodeException);
+        }
+
+        Assert.assertNull(client.unwrap().checkExists().forPath("/test"));
+    }
 }


[08/23] curator git commit: added some doc

Posted by ra...@apache.org.
added some doc


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4f12abcd
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4f12abcd
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4f12abcd

Branch: refs/heads/master
Commit: 4f12abcdc0c61cae52099f47f158433ebe09326c
Parents: ecbd386
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 10:48:47 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 10:48:47 2017 -0500

----------------------------------------------------------------------
 .../x/async/modeled/migrations/MetaData.java    |  9 ++++
 .../x/async/modeled/migrations/Migration.java   | 17 +++++++
 .../modeled/migrations/MigrationManager.java    | 48 +++++++++++++++++---
 .../async/modeled/migrations/MigrationSet.java  | 13 ++++++
 4 files changed, 81 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/4f12abcd/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
index c2878e2..da40a5b 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
@@ -20,6 +20,9 @@ package org.apache.curator.x.async.modeled.migrations;
 
 import java.util.Objects;
 
+/**
+ * The meta data of a single migration
+ */
 public class MetaData
 {
     private final String migrationId;
@@ -36,11 +39,17 @@ public class MetaData
         this.migrationVersion = migrationVersion;
     }
 
+    /**
+     * @return The ID of the migration that was applied
+     */
     public String getMigrationId()
     {
         return migrationId;
     }
 
+    /**
+     * @return the version of the migration that was applied
+     */
     public int getMigrationVersion()
     {
         return migrationVersion;

http://git-wip-us.apache.org/repos/asf/curator/blob/4f12abcd/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index 63a7a7d..b456580 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -23,14 +23,31 @@ import java.util.List;
 import java.util.Objects;
 import java.util.function.Supplier;
 
+/**
+ * Models a single migration/transition
+ */
 public interface Migration
 {
+    /**
+     * @return the unique ID for this migration
+     */
     String id();
 
+    /**
+     * @return the version of this migration
+     */
     int version();
 
+    /**
+     * @return the operations to execute in a transaction
+     */
     List<CuratorOp> operations();
 
+    static Migration build(String id, Supplier<List<CuratorOp>> operationsProc)
+    {
+        return build(id, 1, operationsProc);
+    }
+
     static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
     {
         Objects.requireNonNull(id, "id cannot be null");

http://git-wip-us.apache.org/repos/asf/curator/blob/4f12abcd/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index bfb9707..01de2f8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -42,6 +42,9 @@ import java.util.stream.Collectors;
 
 import static org.apache.curator.x.async.AsyncWrappers.*;
 
+/**
+ * Manages migrations
+ */
 public class MigrationManager
 {
     private final AsyncCuratorFramework client;
@@ -52,6 +55,16 @@ public class MigrationManager
 
     private static final String META_DATA_NODE_NAME = "meta-";
 
+    /**
+     * Jackson usage: See the note in {@link org.apache.curator.x.async.modeled.JacksonModelSerializer} regarding how the Jackson library is specified in Curator's Maven file.
+     * Unless you are not using Jackson pass <code>JacksonModelSerializer.build(MetaData.class)</code> for <code>metaDataSerializer</code>
+     *
+     * @param client the curator client
+     * @param lockPath base path for locks used by the manager
+     * @param metaDataSerializer JacksonModelSerializer.build(MetaData.class)
+     * @param executor the executor to use
+     * @param lockMax max time to wait for locks
+     */
     public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");
@@ -61,12 +74,13 @@ public class MigrationManager
         this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
     }
 
-    public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
-    {
-        ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
-        return ZNode.models(modeled.childrenAsZNodes());
-    }
-
+    /**
+     * Process the given migration set
+     *
+     * @param set the set
+     * @return completion stage. If there is a migration-specific error, the stage will be completed
+     * exceptionally with {@link org.apache.curator.x.async.modeled.migrations.MigrationException}.
+     */
     public CompletionStage<Void> migrate(MigrationSet set)
     {
         String lockPath = this.lockPath.child(set.id()).fullPath();
@@ -75,6 +89,28 @@ public class MigrationManager
         return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
     }
 
+    /**
+     * Utility to return the meta data from previous migrations
+     *
+     * @param set the set
+     * @return stage
+     */
+    public CompletionStage<List<MetaData>> metaData(MigrationSet set)
+    {
+        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
+        return ZNode.models(modeled.childrenAsZNodes());
+    }
+
+    /**
+     * Can be overridden to change how the comparison to previous migrations is done. The default
+     * version ensures that the meta data from previous migrations matches the current migration
+     * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
+     *
+     * @param set the migration set being applied
+     * @param sortedMetaData previous migration meta data (may be empty)
+     * @return the list of actual migrations to perform. The filter can return any value here or an empty list.
+     * @throws MigrationException errors
+     */
     protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
     {
         if ( sortedMetaData.size() > set.migrations().size() )

http://git-wip-us.apache.org/repos/asf/curator/blob/4f12abcd/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
index 0a0dbe0..c4cd90e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
@@ -23,12 +23,25 @@ import org.apache.curator.x.async.modeled.ZPath;
 import java.util.List;
 import java.util.Objects;
 
+/**
+ * Models a set of migrations. Each individual migration is applied
+ * in a transaction.
+ */
 public interface MigrationSet
 {
+    /**
+     * @return the unique ID for this migration set
+     */
     String id();
 
+    /**
+     * @return where to store the meta data for this migration set
+     */
     ZPath metaDataPath();
 
+    /**
+     * @return list of migrations in the order that they should be applied
+     */
     List<Migration> migrations();
 
     static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)


[04/23] curator git commit: reworking so that this feature is more general. Now manages any set of transactions

Posted by ra...@apache.org.
reworking so that this feature is more general. Now manages any set of transactions


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a15582b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a15582b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a15582b

Branch: refs/heads/master
Commit: 1a15582b59ccfaf80b1547fd57bcb7dfe894b1c8
Parents: 1a0c162
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 09:24:33 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 09:24:33 2017 -0500

----------------------------------------------------------------------
 .../InvalidMigrationSetException.java           |  37 ----
 .../x/async/modeled/migrations/Migration.java   |  14 +-
 .../modeled/migrations/MigrationException.java  |  37 ++++
 .../modeled/migrations/MigrationManager.java    | 132 +++++++++++++-
 .../migrations/MigrationManagerBuilder.java     |  68 -------
 .../migrations/MigrationManagerImpl.java        | 181 -------------------
 .../async/modeled/migrations/MigrationSet.java  |  11 +-
 .../migrations/TestMigrationManager.java        |  35 ++--
 8 files changed, 196 insertions(+), 319 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
deleted file mode 100644
index 84b21bf..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import java.util.Objects;
-
-public class InvalidMigrationSetException extends RuntimeException
-{
-    private final String migrationId;
-
-    public InvalidMigrationSetException(String migrationId, String message)
-    {
-        super(message);
-        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
-    }
-
-    public String getMigrationId()
-    {
-        return migrationId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index b3919d1..63a7a7d 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -18,8 +18,10 @@
  */
 package org.apache.curator.x.async.modeled.migrations;
 
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import java.util.List;
 import java.util.Objects;
-import java.util.function.UnaryOperator;
+import java.util.function.Supplier;
 
 public interface Migration
 {
@@ -27,12 +29,12 @@ public interface Migration
 
     int version();
 
-    byte[] migrate(byte[] previousBytes);
+    List<CuratorOp> operations();
 
-    static Migration build(String id, int version, UnaryOperator<byte[]> migrateProc)
+    static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
     {
         Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(migrateProc, "migrateProc cannot be null");
+        Objects.requireNonNull(operationsProc, "operationsProc cannot be null");
         return new Migration()
         {
             @Override
@@ -48,9 +50,9 @@ public interface Migration
             }
 
             @Override
-            public byte[] migrate(byte[] previousBytes)
+            public List<CuratorOp> operations()
             {
-                return migrateProc.apply(previousBytes);
+                return operationsProc.get();
             }
         };
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
new file mode 100644
index 0000000..1a1d59c
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+
+public class MigrationException extends RuntimeException
+{
+    private final String migrationId;
+
+    public MigrationException(String migrationId, String message)
+    {
+        super(message);
+        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+    }
+
+    public String getMigrationId()
+    {
+        return migrationId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 2d5f39f..47adb1e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -18,20 +18,142 @@
  */
 package org.apache.curator.x.async.modeled.migrations;
 
+import com.google.common.base.Throwables;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.CreateMode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
-public interface MigrationManager
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+public class MigrationManager
 {
-    CompletionStage<List<MetaData>> metaData(ZPath metaDataPath);
+    private final AsyncCuratorFramework client;
+    private final ZPath lockPath;
+    private final ModelSerializer<MetaData> metaDataSerializer;
+    private final Executor executor;
+    private final Duration lockMax;
+
+    private static final String META_DATA_NODE_NAME = "meta-";
+
+    public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+        this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
+        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+    }
+
+    public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
+    {
+        ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
+        return ZNode.models(modeled.childrenAsZNodes());
+    }
+
+    public CompletionStage<Void> migrate(MigrationSet set)
+    {
+        String lockPath = this.lockPath.child(set.id()).fullPath();
+        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
+        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+    }
+
+    protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
+    {
+        if ( sortedMetaData.size() > set.migrations().size() )
+        {
+            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+        }
 
-    CompletionStage<Void> run();
+        int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
+        List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
+            .stream()
+            .map(m -> new MetaData(m.id(), m.version()))
+            .collect(Collectors.toList());
+        if ( !compareMigrations.equals(sortedMetaData) )
+        {
+            throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+        }
+        return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
+    }
+
+    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+    {
+        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
+        return modeled.childrenAsZNodes()
+            .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
+            .handle((v, e) -> {
+                release(lock, true);
+                if ( e != null )
+                {
+                    Throwables.propagate(e);
+                }
+                return v;
+            }
+        );
+    }
+
+    private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
+    {
+        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+        return ModeledFramework.wrap(client, modelSpec);
+    }
+
+    private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
+    {
+        List<MetaData> sortedMetaData = metaDataNodes
+            .stream()
+            .sorted(Comparator.comparing(m -> m.path().fullPath()))
+            .map(ZNode::model)
+            .collect(Collectors.toList());
+
+        List<Migration> toBeApplied;
+        try
+        {
+            toBeApplied = filter(set, sortedMetaData);
+        }
+        catch ( MigrationException e )
+        {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+
+        if ( toBeApplied.size() == 0 )
+        {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
+            .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient));
+    }
 
-    static MigrationManagerBuilder builder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
+    private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
     {
-        return new MigrationManagerBuilder(client, lockPath, metaDataSerializer);
+        List<CuratorOp> operations = new ArrayList<>();
+        for ( Migration migration : toBeApplied )
+        {
+            operations.addAll(migration.operations());
+            MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+            operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
+        }
+        return client.transaction().forOperations(operations).thenApply(__ -> null);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
deleted file mode 100644
index ed48242..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ZPath;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Executor;
-
-public class MigrationManagerBuilder
-{
-    private final AsyncCuratorFramework client;
-    private final ZPath lockPath;
-    private final ModelSerializer<MetaData> metaDataSerializer;
-    private final List<MigrationSet> sets = new ArrayList<>();
-    private Executor executor = Runnable::run;
-    private Duration lockMax = Duration.ofSeconds(15);
-
-    MigrationManagerBuilder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
-    {
-        this.client = Objects.requireNonNull(client, "client cannot be null");
-        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
-        this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
-    }
-
-    public MigrationManager build()
-    {
-        return new MigrationManagerImpl(client, lockPath, metaDataSerializer, executor, lockMax, sets);
-    }
-
-    public MigrationManagerBuilder withExecutor(Executor executor)
-    {
-        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
-        return this;
-    }
-
-    public MigrationManagerBuilder withLockMax(Duration lockMax)
-    {
-        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
-        return this;
-    }
-
-    public MigrationManagerBuilder adding(MigrationSet set)
-    {
-        sets.add(Objects.requireNonNull(set, "set cannot be null"));
-        return this;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
deleted file mode 100644
index 15c61b2..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZNode;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.CreateMode;
-import java.time.Duration;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.curator.x.async.AsyncWrappers.*;
-
-class MigrationManagerImpl implements MigrationManager
-{
-    private final AsyncCuratorFramework client;
-    private final ZPath lockPath;
-    private final ModelSerializer<MetaData> metaDataSerializer;
-    private final Executor executor;
-    private final Duration lockMax;
-    private final List<MigrationSet> sets;
-
-    private static final String META_DATA_NODE_NAME = "meta-";
-
-    MigrationManagerImpl(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax, List<MigrationSet> sets)
-    {
-        this.client = client;
-        this.lockPath = lockPath;
-        this.metaDataSerializer = metaDataSerializer;
-        this.executor = executor;
-        this.lockMax = lockMax;
-        this.sets = ImmutableList.copyOf(sets);
-    }
-
-    @Override
-    public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
-    {
-        ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
-        return ZNode.models(modeled.childrenAsZNodes());
-    }
-
-    @Override
-    public CompletionStage<Void> run()
-    {
-        Map<String, CompletableFuture<Void>> futures = sets
-            .stream()
-            .map(m -> new AbstractMap.SimpleEntry<>(m.id(), runMigration(m).toCompletableFuture()))
-            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
-        return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[futures.size()]));
-    }
-
-    private CompletionStage<Void> runMigration(MigrationSet set)
-    {
-        String lockPath = this.lockPath.child(set.id()).fullPath();
-        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
-        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
-        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
-    }
-
-    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
-    {
-        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
-        return modeled.childrenAsZNodes()
-            .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
-            .handle((v, e) -> {
-                release(lock, true);
-                if ( e != null )
-                {
-                    Throwables.propagate(e);
-                }
-                return v;
-            }
-        );
-    }
-
-    private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
-    {
-        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
-        return ModeledFramework.wrap(client, modelSpec);
-    }
-
-    protected void checkIsValid(MigrationSet set, List<MetaData> sortedMetaData) throws InvalidMigrationSetException
-    {
-        if ( sortedMetaData.size() > set.migrations().size() )
-        {
-            throw new InvalidMigrationSetException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
-        }
-
-        int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
-        List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
-            .stream()
-            .map(m -> new MetaData(m.id(), m.version()))
-            .collect(Collectors.toList());
-        if ( !compareMigrations.equals(sortedMetaData) )
-        {
-            throw new InvalidMigrationSetException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
-        }
-    }
-
-    private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
-    {
-        List<MetaData> sortedMetaData = metaDataNodes
-            .stream()
-            .sorted(Comparator.comparing(m -> m.path().fullPath()))
-            .map(ZNode::model)
-            .collect(Collectors.toList());
-        try
-        {
-            checkIsValid(set, sortedMetaData);
-        }
-        catch ( InvalidMigrationSetException e )
-        {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.completeExceptionally(e);
-            return future;
-        }
-
-        List<Migration> toBeApplied = set.migrations().subList(sortedMetaData.size(), set.migrations().size());
-        if ( toBeApplied.size() == 0 )
-        {
-            return CompletableFuture.completedFuture(null);
-        }
-
-        return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
-            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, metaDataClient));
-    }
-
-    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
-    {
-        ModelSpec<byte[]> modelSpec = ModelSpec.builder(set.path(), ModelSerializer.raw).build();
-        ModeledFramework<byte[]> modeled = ModeledFramework.wrap(client, modelSpec);
-        return modeled.childrenAsZNodes().thenCompose(nodes -> {
-            List<CuratorOp> operations = new ArrayList<>();
-            for ( ZNode<byte[]> node : nodes )
-            {
-                byte[] currentBytes = node.model();
-                for ( Migration migration : toBeApplied )
-                {
-                    currentBytes = migration.migrate(currentBytes);
-                    MetaData thisMetaData = new MetaData(migration.id(), migration.version());
-                    operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
-                }
-                operations.add(modeled.child(node.path().nodeName()).updateOp(currentBytes, node.stat().getVersion()));
-            }
-            return client.transaction().forOperations(operations).thenApply(__ -> null);
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
index 9e41989..0a0dbe0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
@@ -27,16 +27,13 @@ public interface MigrationSet
 {
     String id();
 
-    ZPath path();
-
     ZPath metaDataPath();
 
     List<Migration> migrations();
 
-    static MigrationSet build(String id, ZPath path, ZPath metaDataPath, List<Migration> migrations)
+    static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)
     {
         Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(path, "path cannot be null");
         Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
         final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
         return new MigrationSet()
@@ -48,12 +45,6 @@ public interface MigrationSet
             }
 
             @Override
-            public ZPath path()
-            {
-                return path;
-            }
-
-            @Override
             public ZPath metaDataPath()
             {
                 return metaDataPath;

http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index d709abe..daf69cd 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.migrations;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -37,7 +38,11 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.UnaryOperator;
 
 public class TestMigrationManager extends CompletableBaseClassForTests
@@ -47,6 +52,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     private ModelSpec<ModelV1> v1Spec;
     private ModelSpec<ModelV2> v2Spec;
     private ModelSpec<ModelV3> v3Spec;
+    private ExecutorService executor;
 
     @BeforeMethod
     @Override
@@ -54,10 +60,10 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     {
         super.setup();
 
-        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
-        client.start();
+        CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        rawClient.start();
 
-        this.client = AsyncCuratorFramework.wrap(client);
+        this.client = AsyncCuratorFramework.wrap(rawClient);
 
         ObjectMapper mapper = new ObjectMapper();
         UnaryOperator<byte[]> from1to2 = bytes -> {
@@ -89,13 +95,20 @@ public class TestMigrationManager extends CompletableBaseClassForTests
 
         ZPath modelPath = ZPath.parse("/test/it");
 
-        Migration m1 = Migration.build("1",1, from1to2);
-        Migration m2 = Migration.build("2",1, from2to3);
-        migrationSet = MigrationSet.build("1", modelPath, ZPath.parse("/metadata"), Arrays.asList(m1, m2));
-
         v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
         v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
         v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+
+        CuratorOp v1op = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+        CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+        CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
+
+        Migration m1 = Migration.build("1",1, () -> Collections.singletonList(v1op));
+        Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+        Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
+
+        executor = Executors.newCachedThreadPool();
     }
 
     @AfterMethod
@@ -103,6 +116,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     public void teardown() throws Exception
     {
         CloseableUtils.closeQuietly(client.unwrap());
+        executor.shutdownNow();
         super.teardown();
     }
 
@@ -113,11 +127,8 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         ModelV1 v1 = new ModelV1("John Galt");
         complete(v1Client.child("1").set(v1));
 
-        MigrationManager manager = MigrationManager.builder(this.client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class))
-            .adding(migrationSet)
-            .build();
-
-        complete(manager.run());
+        MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+        complete(manager.migrate(migrationSet));
 
         ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
         complete(v3Client.child("1").read(), (m, e) -> {


[14/23] curator git commit: give Migrations a link in the left menu of help

Posted by ra...@apache.org.
 give Migrations a link in the left menu of help


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/909ed9ae
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/909ed9ae
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/909ed9ae

Branch: refs/heads/master
Commit: 909ed9aeddc034a9a59868db22847801ea25b4ec
Parents: 72ff5a9
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 14:37:09 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 14:37:09 2017 -0500

----------------------------------------------------------------------
 curator-x-async/src/site/site.xml | 8 +++++---
 src/site/site.xml                 | 1 +
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/909ed9ae/curator-x-async/src/site/site.xml
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/site.xml b/curator-x-async/src/site/site.xml
index fc0a67a..70b5d4b 100644
--- a/curator-x-async/src/site/site.xml
+++ b/curator-x-async/src/site/site.xml
@@ -23,15 +23,17 @@
     <body>
         <head>
             <link rel="stylesheet" href="../css/site.css" />
-            <script type="text/javascript">
+            <script type="text/javascript"><![CDATA[
                 $(function(){
-                    if ( location &amp;&amp; location.pathname &amp;&amp; (location.pathname.endsWith('/index.html') || location.pathname.endsWith('/migrations.html')) ) {
+                    if ( location && location.pathname && location.pathname.endsWith('/index.html') ) {
                         $('a[title="Java 8/Async"]').parent().addClass("active");
+                    } else if ( location && location.pathname && location.pathname.endsWith('/migrations.html') ) {
+                        $('a[title="Migrations"]').parent().addClass("active");
                     } else {
                         $('a[title="Strongly Typed Models"]').parent().addClass("active");
                     }
                 });
-            </script>
+            ]]></script>
         </head>
     </body>
 </project>

http://git-wip-us.apache.org/repos/asf/curator/blob/909ed9ae/src/site/site.xml
----------------------------------------------------------------------
diff --git a/src/site/site.xml b/src/site/site.xml
index 83ddd46..4b1ac98 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -75,6 +75,7 @@
             <item name="Client" href="curator-client/index.html"/>
             <item name="Java 8/Async" href="curator-x-async/index.html"/>
             <item name="Strongly Typed Models" href="curator-x-async/modeled.html"/>
+            <item name="Migrations" href="curator-x-async/migrations.html"/>
             <item name="Schema Support" href="curator-framework/schema.html"/>
         </menu>
 


[18/23] curator git commit: Added asyncEnsureParents()

Posted by ra...@apache.org.
Added asyncEnsureParents()


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9d30a8c8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9d30a8c8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9d30a8c8

Branch: refs/heads/master
Commit: 9d30a8c8769f803715862eb9b5479057bfd0d5af
Parents: 9385d04
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:02:30 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:02:30 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/x/async/AsyncWrappers.java   | 36 ++++++++++++++------
 1 file changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9d30a8c8/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index d7b3cc3..f26b3b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -28,7 +28,6 @@ import org.apache.zookeeper.KeeperException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
@@ -134,8 +133,19 @@ public class AsyncWrappers
     }
 
     /**
-     * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
-     * the given executor
+     * Asynchronously ensure that the parents of the given path are created
+     *
+     * @param client client
+     * @param path path to ensure
+     * @return stage
+     */
+    public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path)
+    {
+        return ensure(client, path, ExistsOption.createParentsIfNeeded);
+    }
+
+    /**
+     * Asynchronously ensure that the parents of the given path are created as containers
      *
      * @param client client
      * @param path path to ensure
@@ -143,14 +153,7 @@ public class AsyncWrappers
      */
     public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
     {
-        String localPath = ZKPaths.makePath(path, "foo");
-        Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
-        return client
-            .checkExists()
-            .withOptions(options)
-            .forPath(localPath)
-            .thenApply(__ -> null)
-            ;
+        return ensure(client, path, ExistsOption.createParentsAsContainers);
     }
 
     /**
@@ -373,6 +376,17 @@ public class AsyncWrappers
         });
     }
 
+    private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option)
+    {
+        String localPath = ZKPaths.makePath(path, "foo");
+        return client
+            .checkExists()
+            .withOptions(Collections.singleton(option))
+            .forPath(localPath)
+            .thenApply(__ -> null)
+            ;
+    }
+
     private AsyncWrappers()
     {
     }


[21/23] curator git commit: The entire migration set should be 1 transaction - not each inidividual migration

Posted by ra...@apache.org.
The entire migration set should be 1 transaction - not each inidividual migration


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/33e41388
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/33e41388
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/33e41388

Branch: refs/heads/master
Commit: 33e4138840904635a1793084051fd50b643794f1
Parents: c77ef82
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 18 08:10:43 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 18 08:10:43 2017 -0500

----------------------------------------------------------------------
 .../x/async/migrations/MigrationManager.java     | 19 +++++++++----------
 .../src/site/confluence/migrations.confluence    |  4 +++-
 .../x/async/migrations/TestMigrationManager.java |  2 +-
 3 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
index 56e7f04..e51f0e4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -106,7 +106,6 @@ public class MigrationManager
         }
 
         int compareSize = Math.min(set.migrations().size(), operationHashesInOrder.size());
-        List<Migration> subList = set.migrations().subList(0, compareSize);
         for ( int i = 0; i < compareSize; ++i )
         {
             byte[] setHash = hash(set.migrations().get(i).operations());
@@ -184,23 +183,23 @@ public class MigrationManager
         }
 
         return asyncEnsureContainers(client, thisMetaDataPath)
-            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath));
+            .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, thisMetaDataPath));
     }
 
     @VisibleForTesting
     volatile AtomicInteger debugCount = null;
 
-    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath)
+    private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, String thisMetaDataPath)
     {
         debugCount.incrementAndGet();
 
+        List<CuratorOp> operations = new ArrayList<>();
         String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME);
-        List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
-            List<CuratorOp> operations = new ArrayList<>();
-            operations.addAll(migration.operations());
-            operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(operations)));
-            return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
-        }).collect(Collectors.toList());
-        return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
+        toBeApplied.forEach(migration -> {
+            List<CuratorOp> thisMigrationOperations = migration.operations();
+            operations.addAll(thisMigrationOperations);
+            operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(thisMigrationOperations)));
+        });
+        return client.transaction().forOperations(operations).thenApply(__ -> null);
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/site/confluence/migrations.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence
index 4775fac..bd2d36f 100644
--- a/curator-x-async/src/site/confluence/migrations.confluence
+++ b/curator-x-async/src/site/confluence/migrations.confluence
@@ -91,7 +91,9 @@ manager.migrate(set).exceptionally(e -> {
 });
 {code}
 
-Each migration in the set is applied in a transaction. MigrationManager stores a hash
+Each migration in the set is applied in a single transaction - i.e. all operations that comprise
+a migration set (the sum of all individual migration operations) are sent to ZooKeeper as a single
+transaction. MigrationManager stores a hash
 of all operations in a migration so that it can be compared for future operations. i.e.
 if, in the future, a migration set is attempted but the hash of one of the previous migrations
 does not match, the stage completes exceptionally with {{MigrationException}}.

http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 786e704..47d09ab 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -268,7 +268,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
             Assert.assertTrue(Throwables.getRootCause(e) instanceof KeeperException.NoNodeException);
         }
 
-        Assert.assertEquals(client.unwrap().getData().forPath("/test"), "something".getBytes());
+        Assert.assertNull(client.unwrap().checkExists().forPath("/test"));  // should be all or nothing
     }
 
     @Test


[05/23] curator git commit: new generalization complete with initial test

Posted by ra...@apache.org.
new generalization complete with initial test


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c3adc953
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c3adc953
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c3adc953

Branch: refs/heads/master
Commit: c3adc95315413ee5c0be861b528e80a380444e5b
Parents: 1a15582
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 09:36:51 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 09:36:51 2017 -0500

----------------------------------------------------------------------
 .../async/modeled/migrations/MigrationManager.java | 10 +++++-----
 .../modeled/migrations/TestMigrationManager.java   | 17 +++++++----------
 2 files changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c3adc953/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 47adb1e..e59b7bf 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -147,13 +147,13 @@ public class MigrationManager
 
     private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
     {
-        List<CuratorOp> operations = new ArrayList<>();
-        for ( Migration migration : toBeApplied )
-        {
+        List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
+            List<CuratorOp> operations = new ArrayList<>();
             operations.addAll(migration.operations());
             MetaData thisMetaData = new MetaData(migration.id(), migration.version());
             operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
-        }
-        return client.transaction().forOperations(operations).thenApply(__ -> null);
+            return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
+        }).collect(Collectors.toList());
+        return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c3adc953/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index daf69cd..9fcd53c 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -99,11 +99,12 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
         v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
 
-        CuratorOp v1op = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+        CuratorOp v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
+        CuratorOp v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
         CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
         CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
 
-        Migration m1 = Migration.build("1",1, () -> Collections.singletonList(v1op));
+        Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
         Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
         Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
         migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
@@ -123,18 +124,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     @Test
     public void testBasic() throws Exception
     {
-        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
-        ModelV1 v1 = new ModelV1("John Galt");
-        complete(v1Client.child("1").set(v1));
-
         MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
         complete(manager.migrate(migrationSet));
 
         ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
-        complete(v3Client.child("1").read(), (m, e) -> {
-            Assert.assertEquals(m.getAge(), 64);
-            Assert.assertEquals(m.getFirstName(), "John");
-            Assert.assertEquals(m.getLastName(), "Galt");
+        complete(v3Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 30);
+            Assert.assertEquals(m.getFirstName(), "One");
+            Assert.assertEquals(m.getLastName(), "Two");
         });
     }
 }


[11/23] curator git commit: major refactoring and simplification. No longer dependent on any modeled code so it's moved to the parent async package

Posted by ra...@apache.org.
major refactoring and simplification. No longer dependent on any modeled code so it's moved to the parent async package


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d80651a7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d80651a7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d80651a7

Branch: refs/heads/master
Commit: d80651a7a276b0ce999601c30e205aceaae94d4c
Parents: 2ab172a
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:32:37 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:32:37 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/x/async/AsyncWrappers.java   |  96 +++++++-
 .../curator/x/async/migrations/Migration.java   |  37 ++++
 .../x/async/migrations/MigrationException.java  |  37 ++++
 .../x/async/migrations/MigrationManager.java    | 195 ++++++++++++++++
 .../x/async/migrations/MigrationSet.java        |  72 ++++++
 .../x/async/modeled/migrations/MetaData.java    |  25 ---
 .../x/async/modeled/migrations/Migration.java   |  37 ----
 .../modeled/migrations/MigrationException.java  |  37 ----
 .../modeled/migrations/MigrationManager.java    | 220 -------------------
 .../async/modeled/migrations/MigrationSet.java  |  73 ------
 .../async/migrations/TestMigrationManager.java  | 173 +++++++++++++++
 .../x/async/migrations/models/ModelV1.java      |  39 ++++
 .../x/async/migrations/models/ModelV2.java      |  46 ++++
 .../x/async/migrations/models/ModelV3.java      |  53 +++++
 .../migrations/TestMigrationManager.java        | 173 ---------------
 .../modeled/migrations/models/ModelV1.java      |  39 ----
 .../modeled/migrations/models/ModelV2.java      |  46 ----
 .../modeled/migrations/models/ModelV3.java      |  53 -----
 18 files changed, 747 insertions(+), 704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..d7b3cc3 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,12 +18,16 @@
  */
 package org.apache.curator.x.async;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -70,6 +74,66 @@ import java.util.concurrent.TimeUnit;
 public class AsyncWrappers
 {
     /**
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+     * </p>
+     *
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path)
+    {
+        return childrenWithData(client, path, false);
+    }
+
+    /**
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+     * </p>
+     *
+     * @param isCompressed pass true if data is compressed
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed)
+    {
+        CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>();
+        client.getChildren().forPath(path).handle((children, e) -> {
+            if ( e != null )
+            {
+                if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException )
+                {
+                    future.complete(Maps.newHashMap());
+                }
+                else
+                {
+                    future.completeExceptionally(e);
+                }
+            }
+            else
+            {
+                completeChildren(client, future, path, children, isCompressed);
+            }
+            return null;
+        });
+        return future;
+    }
+
+    /**
      * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
      * the given executor
      *
@@ -279,6 +343,36 @@ public class AsyncWrappers
         }
     }
 
+    private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed)
+    {
+        Map<String, byte[]> nodes = Maps.newHashMap();
+        if ( children.size() == 0 )
+        {
+            future.complete(nodes);
+            return;
+        }
+
+        children.forEach(node -> {
+            String path = ZKPaths.makePath(parentPath, node);
+            AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+            stage.handle((data, e) -> {
+                if ( e != null )
+                {
+                    future.completeExceptionally(e);
+                }
+                else
+                {
+                    nodes.put(path, data);
+                    if ( nodes.size() == children.size() )
+                    {
+                        future.complete(nodes);
+                    }
+                }
+                return null;
+            });
+        });
+    }
+
     private AsyncWrappers()
     {
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
new file mode 100644
index 0000000..daac435
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import java.util.List;
+
+/**
+ * Models a single migration/transition
+ */
+@FunctionalInterface
+public interface Migration
+{
+    /**
+     * Return the operations to execute in a transaction. IMPORTANT: during a migration
+     * this method may be called multiple times.
+     *
+     * @return operations
+     */
+    List<CuratorOp> operations();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
new file mode 100644
index 0000000..c4971ce
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import java.util.Objects;
+
+public class MigrationException extends RuntimeException
+{
+    private final String migrationId;
+
+    public MigrationException(String migrationId, String message)
+    {
+        super(message);
+        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+    }
+
+    public String getMigrationId()
+    {
+        return migrationId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
new file mode 100644
index 0000000..cb3d6ff
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.google.common.base.Throwables;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.imps.ExtractingCuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+/**
+ * Manages migrations
+ */
+public class MigrationManager
+{
+    private final AsyncCuratorFramework client;
+    private final String lockPath;
+    private final Executor executor;
+    private final Duration lockMax;
+
+    private static final String META_DATA_NODE_NAME = "meta-";
+
+    /**
+     * @param client the curator client
+     * @param lockPath base path for locks used by the manager
+     * @param executor the executor to use
+     * @param lockMax max time to wait for locks
+     */
+    public MigrationManager(AsyncCuratorFramework client, String lockPath, Executor executor, Duration lockMax)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+    }
+
+    /**
+     * Process the given migration set
+     *
+     * @param set the set
+     * @return completion stage. If there is a migration-specific error, the stage will be completed
+     * exceptionally with {@link org.apache.curator.x.async.migrations.MigrationException}.
+     */
+    public CompletionStage<Void> migrate(MigrationSet set)
+    {
+        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), ZKPaths.makePath(lockPath, set.id()));
+        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+    }
+
+    /**
+     * Can be overridden to change how the comparison to previous migrations is done. The default
+     * version ensures that the meta data from previous migrations matches the current migration
+     * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
+     *
+     * @param set the migration set being applied
+     * @param operationHashesInOrder previous operation hashes (may be empty)
+     * @return the list of actual migrations to perform. The filter can return any value here or an empty list.
+     * @throws MigrationException errors
+     */
+    protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException
+    {
+        if ( operationHashesInOrder.size() > set.migrations().size() )
+        {
+            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
+        }
+
+        int compareSize = Math.min(set.migrations().size(), operationHashesInOrder.size());
+        List<Migration> subList = set.migrations().subList(0, compareSize);
+        for ( int i = 0; i < compareSize; ++i )
+        {
+            byte[] setHash = hash(set.migrations().get(i).operations());
+            if ( !Arrays.equals(setHash, operationHashesInOrder.get(i)) )
+            {
+                throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
+            }
+        }
+        return set.migrations().subList(operationHashesInOrder.size(), set.migrations().size());
+    }
+
+    private byte[] hash(List<CuratorOp> operations)
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("SHA-256");
+        }
+        catch ( NoSuchAlgorithmException e )
+        {
+            throw new RuntimeException(e);
+        }
+        operations.forEach(op -> {
+            if ( op instanceof ExtractingCuratorOp )
+            {
+                ((ExtractingCuratorOp)op).addToDigest(digest);
+            }
+            else
+            {
+                digest.update(op.toString().getBytes());
+            }
+        });
+        return digest.digest();
+    }
+
+    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+    {
+        return childrenWithData(client, set.metaDataPath())
+            .thenCompose(metaData -> applyMetaData(set, metaData))
+            .handle((v, e) -> {
+                release(lock, true);
+                if ( e != null )
+                {
+                    Throwables.propagate(e);
+                }
+                return v;
+            }
+        );
+    }
+
+    private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData)
+    {
+        List<byte[]> sortedMetaData = metaData.keySet()
+            .stream()
+            .sorted(Comparator.naturalOrder())
+            .map(metaData::get)
+            .collect(Collectors.toList());
+
+        List<Migration> toBeApplied;
+        try
+        {
+            toBeApplied = filter(set, sortedMetaData);
+        }
+        catch ( MigrationException e )
+        {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+
+        if ( toBeApplied.size() == 0 )
+        {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return asyncEnsureContainers(client, set.metaDataPath())
+            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied));
+    }
+
+    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied)
+    {
+        String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), META_DATA_NODE_NAME);
+        List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
+            List<CuratorOp> operations = new ArrayList<>();
+            operations.addAll(migration.operations());
+            operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(operations)));
+            return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
+        }).collect(Collectors.toList());
+        return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
new file mode 100644
index 0000000..089d3d8
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Models a set of migrations. Each individual migration is applied
+ * in a transaction.
+ */
+public interface MigrationSet
+{
+    /**
+     * @return the unique ID for this migration set
+     */
+    String id();
+
+    /**
+     * @return where to store the meta data for this migration set
+     */
+    String metaDataPath();
+
+    /**
+     * @return list of migrations in the order that they should be applied
+     */
+    List<Migration> migrations();
+
+    static MigrationSet build(String id, String metaDataPath, List<Migration> migrations)
+    {
+        Objects.requireNonNull(id, "id cannot be null");
+        Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
+        final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
+        return new MigrationSet()
+        {
+            @Override
+            public String id()
+            {
+                return id;
+            }
+
+            @Override
+            public String metaDataPath()
+            {
+                return metaDataPath;
+            }
+
+            @Override
+            public List<Migration> migrations()
+            {
+                return migrationsCopy;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
deleted file mode 100644
index 8377967..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-@FunctionalInterface
-public interface MetaData
-{
-    byte[] operationHash();
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
deleted file mode 100644
index 972c59e..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import java.util.List;
-
-/**
- * Models a single migration/transition
- */
-@FunctionalInterface
-public interface Migration
-{
-    /**
-     * Return the operations to execute in a transaction. IMPORTANT: during a migration
-     * this method may be called multiple times.
-     *
-     * @return operations
-     */
-    List<CuratorOp> operations();
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
deleted file mode 100644
index 1a1d59c..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import java.util.Objects;
-
-public class MigrationException extends RuntimeException
-{
-    private final String migrationId;
-
-    public MigrationException(String migrationId, String message)
-    {
-        super(message);
-        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
-    }
-
-    public String getMigrationId()
-    {
-        return migrationId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
deleted file mode 100644
index d6d37de..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.base.Throwables;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.imps.ExtractingCuratorOp;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZNode;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.CreateMode;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.curator.x.async.AsyncWrappers.*;
-
-/**
- * Manages migrations
- */
-public class MigrationManager
-{
-    private final AsyncCuratorFramework client;
-    private final ZPath lockPath;
-    private final Executor executor;
-    private final Duration lockMax;
-
-    private static final String META_DATA_NODE_NAME = "meta-";
-
-    /**
-     * @param client the curator client
-     * @param lockPath base path for locks used by the manager
-     * @param executor the executor to use
-     * @param lockMax max time to wait for locks
-     */
-    public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, Executor executor, Duration lockMax)
-    {
-        this.client = Objects.requireNonNull(client, "client cannot be null");
-        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
-        this.executor = Objects.requireNonNull(executor, "executor cannot be null");
-        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
-    }
-
-    /**
-     * Process the given migration set
-     *
-     * @param set the set
-     * @return completion stage. If there is a migration-specific error, the stage will be completed
-     * exceptionally with {@link org.apache.curator.x.async.modeled.migrations.MigrationException}.
-     */
-    public CompletionStage<Void> migrate(MigrationSet set)
-    {
-        String lockPath = this.lockPath.child(set.id()).fullPath();
-        InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
-        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
-        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
-    }
-
-    /**
-     * Can be overridden to change how the comparison to previous migrations is done. The default
-     * version ensures that the meta data from previous migrations matches the current migration
-     * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
-     *
-     * @param set the migration set being applied
-     * @param sortedMetaData previous migration meta data (may be empty)
-     * @return the list of actual migrations to perform. The filter can return any value here or an empty list.
-     * @throws MigrationException errors
-     */
-    protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
-    {
-        if ( sortedMetaData.size() > set.migrations().size() )
-        {
-            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
-        }
-
-        int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
-        List<Migration> subList = set.migrations().subList(0, compareSize);
-        for ( int i = 0; i < compareSize; ++i )
-        {
-            byte[] setHash = hash(set.migrations().get(i).operations()).operationHash();
-            if ( !Arrays.equals(setHash, sortedMetaData.get(i).operationHash()) )
-            {
-                throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
-            }
-        }
-        return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
-    }
-
-    private MetaData hash(List<CuratorOp> operations)
-    {
-        MessageDigest digest;
-        try
-        {
-            digest = MessageDigest.getInstance("SHA-256");
-        }
-        catch ( NoSuchAlgorithmException e )
-        {
-            throw new RuntimeException(e);
-        }
-        operations.forEach(op -> {
-            if ( op instanceof ExtractingCuratorOp )
-            {
-                ((ExtractingCuratorOp)op).addToDigest(digest);
-            }
-            else
-            {
-                digest.update(op.toString().getBytes());
-            }
-        });
-        return digest::digest;
-    }
-
-    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
-    {
-        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
-        return modeled.childrenAsZNodes()
-            .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
-            .handle((v, e) -> {
-                release(lock, true);
-                if ( e != null )
-                {
-                    Throwables.propagate(e);
-                }
-                return v;
-            }
-        );
-    }
-
-    private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
-    {
-        ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>()
-        {
-            @Override
-            public byte[] serialize(MetaData model)
-            {
-                return model.operationHash();
-            }
-
-            @Override
-            public MetaData deserialize(byte[] bytes)
-            {
-                return () -> bytes;
-            }
-        };
-        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
-        return ModeledFramework.wrap(client, modelSpec);
-    }
-
-    private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
-    {
-        List<MetaData> sortedMetaData = metaDataNodes
-            .stream()
-            .sorted(Comparator.comparing(m -> m.path().fullPath()))
-            .map(ZNode::model)
-            .collect(Collectors.toList());
-
-        List<Migration> toBeApplied;
-        try
-        {
-            toBeApplied = filter(set, sortedMetaData);
-        }
-        catch ( MigrationException e )
-        {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.completeExceptionally(e);
-            return future;
-        }
-
-        if ( toBeApplied.size() == 0 )
-        {
-            return CompletableFuture.completedFuture(null);
-        }
-
-        return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
-            .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient));
-    }
-
-    private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
-    {
-        List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
-            List<CuratorOp> operations = new ArrayList<>();
-            operations.addAll(migration.operations());
-            MetaData thisMetaData = hash(operations);
-            operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
-            return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
-        }).collect(Collectors.toList());
-        return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
deleted file mode 100644
index c4cd90e..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.curator.x.async.modeled.ZPath;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Models a set of migrations. Each individual migration is applied
- * in a transaction.
- */
-public interface MigrationSet
-{
-    /**
-     * @return the unique ID for this migration set
-     */
-    String id();
-
-    /**
-     * @return where to store the meta data for this migration set
-     */
-    ZPath metaDataPath();
-
-    /**
-     * @return list of migrations in the order that they should be applied
-     */
-    List<Migration> migrations();
-
-    static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)
-    {
-        Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
-        final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
-        return new MigrationSet()
-        {
-            @Override
-            public String id()
-            {
-                return id;
-            }
-
-            @Override
-            public ZPath metaDataPath()
-            {
-                return metaDataPath;
-            }
-
-            @Override
-            public List<Migration> migrations()
-            {
-                return migrationsCopy;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
new file mode 100644
index 0000000..42bc76d
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.modeled.JacksonModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.migrations.models.ModelV1;
+import org.apache.curator.x.async.migrations.models.ModelV2;
+import org.apache.curator.x.async.migrations.models.ModelV3;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.UnaryOperator;
+
+public class TestMigrationManager extends CompletableBaseClassForTests
+{
+    private AsyncCuratorFramework client;
+    private ModelSpec<ModelV1> v1Spec;
+    private ModelSpec<ModelV2> v2Spec;
+    private ModelSpec<ModelV3> v3Spec;
+    private ExecutorService executor;
+    private CuratorOp v1opA;
+    private CuratorOp v1opB;
+    private CuratorOp v2op;
+    private CuratorOp v3op;
+    private MigrationManager manager;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+        rawClient.start();
+
+        this.client = AsyncCuratorFramework.wrap(rawClient);
+
+        ObjectMapper mapper = new ObjectMapper();
+        UnaryOperator<byte[]> from1to2 = bytes -> {
+            try
+            {
+                ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
+                ModelV2 v2 = new ModelV2(v1.getName(), 64);
+                return mapper.writeValueAsBytes(v2);
+            }
+            catch ( IOException e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        UnaryOperator<byte[]> from2to3 = bytes -> {
+            try
+            {
+                ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
+                String[] nameParts = v2.getName().split("\\s");
+                ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
+                return mapper.writeValueAsBytes(v3);
+            }
+            catch ( IOException e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        ZPath modelPath = ZPath.parse("/test/it");
+
+        v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
+        v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
+        v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+
+        v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
+        v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+        v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+        v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
+
+        executor = Executors.newCachedThreadPool();
+        manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        CloseableUtils.closeQuietly(client.unwrap());
+        executor.shutdownNow();
+        super.teardown();
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+        Migration m2 = () -> Collections.singletonList(v2op);
+        Migration m3 = () -> Collections.singletonList(v3op);
+        MigrationSet migrationSet = MigrationSet.build("1", "/metadata", Arrays.asList(m1, m2, m3));
+
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+        complete(v3Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 30);
+            Assert.assertEquals(m.getFirstName(), "One");
+            Assert.assertEquals(m.getLastName(), "Two");
+        });
+    }
+
+    @Test
+    public void testStaged() throws Exception
+    {
+        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+        MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", Collections.singletonList(m1));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
+        complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
+
+        Migration m2 = () -> Collections.singletonList(v2op);
+        migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
+        complete(v2Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getName(), "Test 2");
+            Assert.assertEquals(m.getAge(), 10);
+        });
+
+        Migration m3 = () -> Collections.singletonList(v3op);
+        migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2, m3));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+        complete(v3Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 30);
+            Assert.assertEquals(m.getFirstName(), "One");
+            Assert.assertEquals(m.getLastName(), "Two");
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
new file mode 100644
index 0000000..46fc5ff
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV1
+{
+    private final String name;
+
+    public ModelV1()
+    {
+        this("");
+    }
+
+    public ModelV1(String name)
+    {
+        this.name = name;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
new file mode 100644
index 0000000..31e05bd
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV2
+{
+    private final String name;
+    private final int age;
+
+    public ModelV2()
+    {
+        this("", 0);
+    }
+
+    public ModelV2(String name, int age)
+    {
+        this.name = name;
+        this.age = age;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
new file mode 100644
index 0000000..519923c
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV3
+{
+    private final String firstName;
+    private final String lastName;
+    private final int age;
+
+    public ModelV3()
+    {
+        this("", "", 0);
+    }
+
+    public ModelV3(String firstName, String lastName, int age)
+    {
+        this.firstName = firstName;
+        this.lastName = lastName;
+        this.age = age;
+    }
+
+    public String getFirstName()
+    {
+        return firstName;
+    }
+
+    public String getLastName()
+    {
+        return lastName;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
deleted file mode 100644
index 45aa130..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.CompletableBaseClassForTests;
-import org.apache.curator.x.async.modeled.JacksonModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV1;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV2;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV3;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.UnaryOperator;
-
-public class TestMigrationManager extends CompletableBaseClassForTests
-{
-    private AsyncCuratorFramework client;
-    private ModelSpec<ModelV1> v1Spec;
-    private ModelSpec<ModelV2> v2Spec;
-    private ModelSpec<ModelV3> v3Spec;
-    private ExecutorService executor;
-    private CuratorOp v1opA;
-    private CuratorOp v1opB;
-    private CuratorOp v2op;
-    private CuratorOp v3op;
-    private MigrationManager manager;
-
-    @BeforeMethod
-    @Override
-    public void setup() throws Exception
-    {
-        super.setup();
-
-        CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
-        rawClient.start();
-
-        this.client = AsyncCuratorFramework.wrap(rawClient);
-
-        ObjectMapper mapper = new ObjectMapper();
-        UnaryOperator<byte[]> from1to2 = bytes -> {
-            try
-            {
-                ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
-                ModelV2 v2 = new ModelV2(v1.getName(), 64);
-                return mapper.writeValueAsBytes(v2);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
-        UnaryOperator<byte[]> from2to3 = bytes -> {
-            try
-            {
-                ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
-                String[] nameParts = v2.getName().split("\\s");
-                ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
-                return mapper.writeValueAsBytes(v3);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
-        ZPath modelPath = ZPath.parse("/test/it");
-
-        v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
-        v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
-        v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
-
-        v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
-        v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
-        v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
-        v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
-
-        executor = Executors.newCachedThreadPool();
-        manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
-    }
-
-    @AfterMethod
-    @Override
-    public void teardown() throws Exception
-    {
-        CloseableUtils.closeQuietly(client.unwrap());
-        executor.shutdownNow();
-        super.teardown();
-    }
-
-    @Test
-    public void testBasic() throws Exception
-    {
-        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
-        Migration m2 = () -> Collections.singletonList(v2op);
-        Migration m3 = () -> Collections.singletonList(v3op);
-        MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
-
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
-        complete(v3Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getAge(), 30);
-            Assert.assertEquals(m.getFirstName(), "One");
-            Assert.assertEquals(m.getLastName(), "Two");
-        });
-    }
-
-    @Test
-    public void testStaged() throws Exception
-    {
-        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
-        MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
-        complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
-
-        Migration m2 = () -> Collections.singletonList(v2op);
-        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
-        complete(v2Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getName(), "Test 2");
-            Assert.assertEquals(m.getAge(), 10);
-        });
-
-        Migration m3 = () -> Collections.singletonList(v3op);
-        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
-        complete(v3Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getAge(), 30);
-            Assert.assertEquals(m.getFirstName(), "One");
-            Assert.assertEquals(m.getLastName(), "Two");
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
deleted file mode 100644
index 02b13b7..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV1
-{
-    private final String name;
-
-    public ModelV1()
-    {
-        this("");
-    }
-
-    public ModelV1(String name)
-    {
-        this.name = name;
-    }
-
-    public String getName()
-    {
-        return name;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
deleted file mode 100644
index bd77a2e..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV2
-{
-    private final String name;
-    private final int age;
-
-    public ModelV2()
-    {
-        this("", 0);
-    }
-
-    public ModelV2(String name, int age)
-    {
-        this.name = name;
-        this.age = age;
-    }
-
-    public String getName()
-    {
-        return name;
-    }
-
-    public int getAge()
-    {
-        return age;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
deleted file mode 100644
index d4713b8..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV3
-{
-    private final String firstName;
-    private final String lastName;
-    private final int age;
-
-    public ModelV3()
-    {
-        this("", "", 0);
-    }
-
-    public ModelV3(String firstName, String lastName, int age)
-    {
-        this.firstName = firstName;
-        this.lastName = lastName;
-        this.age = age;
-    }
-
-    public String getFirstName()
-    {
-        return firstName;
-    }
-
-    public String getLastName()
-    {
-        return lastName;
-    }
-
-    public int getAge()
-    {
-        return age;
-    }
-}


[02/23] curator git commit: Merge branch 'CURATOR-423' into CURATOR-421

Posted by ra...@apache.org.
Merge branch 'CURATOR-423' into CURATOR-421


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1784a7c6
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1784a7c6
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1784a7c6

Branch: refs/heads/master
Commit: 1784a7c68982bae8bed471d92464168f87b0fceb
Parents: 4cd731c 4a0e022
Author: randgalt <ra...@apache.org>
Authored: Thu Jul 13 23:52:58 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jul 13 23:52:58 2017 -0500

----------------------------------------------------------------------
 .../x/async/details/AsyncTransactionOpImpl.java |  4 ++--
 .../curator/x/async/TestBasicOperations.java    | 22 ++++++++++++++------
 2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------



[22/23] curator git commit: reformat doc a bit

Posted by ra...@apache.org.
reformat doc a bit


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/69ec4d7e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/69ec4d7e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/69ec4d7e

Branch: refs/heads/master
Commit: 69ec4d7e6cbe30452fc096f1d5fb706ba118413c
Parents: 33e4138
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 18 08:15:57 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 18 08:15:57 2017 -0500

----------------------------------------------------------------------
 curator-x-async/src/site/confluence/migrations.confluence | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/69ec4d7e/curator-x-async/src/site/confluence/migrations.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence
index bd2d36f..403e64a 100644
--- a/curator-x-async/src/site/confluence/migrations.confluence
+++ b/curator-x-async/src/site/confluence/migrations.confluence
@@ -91,9 +91,9 @@ manager.migrate(set).exceptionally(e -> {
 });
 {code}
 
-Each migration in the set is applied in a single transaction - i.e. all operations that comprise
+* Each migration in the set is applied in a single transaction - i.e. all operations that comprise
 a migration set (the sum of all individual migration operations) are sent to ZooKeeper as a single
-transaction. MigrationManager stores a hash
-of all operations in a migration so that it can be compared for future operations. i.e.
+transaction.
+* MigrationManager stores a hash of all operations in a migration so that it can be compared for future operations. i.e.
 if, in the future, a migration set is attempted but the hash of one of the previous migrations
 does not match, the stage completes exceptionally with {{MigrationException}}.


[15/23] curator git commit: more testing, bug fixes

Posted by ra...@apache.org.
more testing, bug fixes


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f7e728b9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f7e728b9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f7e728b9

Branch: refs/heads/master
Commit: f7e728b99b8bf8cf1e0b2d68da1c3bf10e75d52a
Parents: 909ed9a
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 17:06:42 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 17:06:42 2017 -0500

----------------------------------------------------------------------
 .../framework/imps/ExtractingCuratorOp.java     |  2 +-
 .../x/async/migrations/MigrationManager.java    |  7 +++
 .../async/migrations/TestMigrationManager.java  | 53 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/f7e728b9/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
index 58a1572..5b179e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
@@ -49,7 +49,7 @@ public class ExtractingCuratorOp implements CuratorOp
 
     public void addToDigest(MessageDigest digest)
     {
-
+        record.addToDigest(digest);
     }
 
     private void validate()

http://git-wip-us.apache.org/repos/asf/curator/blob/f7e728b9/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
index 676eef6..56e7f04 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async.migrations;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.imps.ExtractingCuratorOp;
@@ -39,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 import static org.apache.curator.x.async.AsyncWrappers.*;
@@ -185,8 +187,13 @@ public class MigrationManager
             .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath));
     }
 
+    @VisibleForTesting
+    volatile AtomicInteger debugCount = null;
+
     private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath)
     {
+        debugCount.incrementAndGet();
+
         String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME);
         List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
             List<CuratorOp> operations = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/curator/blob/f7e728b9/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 19740d6..3522911 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.x.async.migrations;
 
+import com.google.common.base.Throwables;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -41,6 +42,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestMigrationManager extends CompletableBaseClassForTests
 {
@@ -79,6 +81,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
 
         executor = Executors.newCachedThreadPool();
         manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
+        manager.debugCount = new AtomicInteger();
     }
 
     @AfterMethod
@@ -106,6 +109,10 @@ public class TestMigrationManager extends CompletableBaseClassForTests
             Assert.assertEquals(m.getFirstName(), "One");
             Assert.assertEquals(m.getLastName(), "Two");
         });
+
+        int count = manager.debugCount.get();
+        complete(manager.migrate(migrationSet));
+        Assert.assertEquals(manager.debugCount.get(), count);   // second call should do nothing
     }
 
     @Test
@@ -165,4 +172,50 @@ public class TestMigrationManager extends CompletableBaseClassForTests
 
         Assert.assertNull(client.unwrap().checkExists().forPath("/main"));
     }
+
+    @Test
+    public void testChecksumDataError() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/test");
+        CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+        Migration migration = () -> Arrays.asList(op1, op2);
+        MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        complete(manager.migrate(migrationSet));
+
+        CuratorOp op2Changed = client.transactionOp().create().forPath("/test/bar", "second".getBytes());
+        migration = () -> Arrays.asList(op1, op2Changed);
+        migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        try
+        {
+            complete(manager.migrate(migrationSet));
+            Assert.fail("Should throw");
+        }
+        catch ( Throwable e )
+        {
+            Assert.assertTrue(Throwables.getRootCause(e) instanceof MigrationException);
+        }
+    }
+
+    @Test
+    public void testChecksumPathError() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/test2");
+        CuratorOp op2 = client.transactionOp().create().forPath("/test2/bar");
+        Migration migration = () -> Arrays.asList(op1, op2);
+        MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        complete(manager.migrate(migrationSet));
+
+        CuratorOp op2Changed = client.transactionOp().create().forPath("/test/bar");
+        migration = () -> Arrays.asList(op1, op2Changed);
+        migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        try
+        {
+            complete(manager.migrate(migrationSet));
+            Assert.fail("Should throw");
+        }
+        catch ( Throwable e )
+        {
+            Assert.assertTrue(Throwables.getRootCause(e) instanceof MigrationException);
+        }
+    }
 }


[20/23] curator git commit: break up the help menu a bit

Posted by ra...@apache.org.
break up the help menu a bit


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c77ef823
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c77ef823
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c77ef823

Branch: refs/heads/master
Commit: c77ef823f91c4ef8713614aec181f681e7a7af8c
Parents: 84191f3
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:28:55 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:28:55 2017 -0500

----------------------------------------------------------------------
 src/site/site.xml | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c77ef823/src/site/site.xml
----------------------------------------------------------------------
diff --git a/src/site/site.xml b/src/site/site.xml
index 4b1ac98..8136c9a 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -70,15 +70,18 @@
             <item name="Getting Started" href="getting-started.html"/>
             <item name="Examples" href="curator-examples/index.html"/>
             <item name="Recipes" href="curator-recipes/index.html"/>
-            <item name="Framework" href="curator-framework/index.html"/>
-            <item name="Utilities" href="utilities.html"/>
-            <item name="Client" href="curator-client/index.html"/>
             <item name="Java 8/Async" href="curator-x-async/index.html"/>
             <item name="Strongly Typed Models" href="curator-x-async/modeled.html"/>
             <item name="Migrations" href="curator-x-async/migrations.html"/>
             <item name="Schema Support" href="curator-framework/schema.html"/>
         </menu>
 
+        <menu name="Low Level" inherit="top">
+            <item name="Framework" href="curator-framework/index.html"/>
+            <item name="Utilities" href="utilities.html"/>
+            <item name="Client" href="curator-client/index.html"/>
+        </menu>
+
         <menu name="Details" inherit="top">
             <item name="Error Handling" href="errors.html"/>
             <item name="Logging and Tracing" href="logging.html"/>


[09/23] curator git commit: refactoring and simplification. No need for ids and versions in Migrations/MetaData. A hash can be auto-generated.

Posted by ra...@apache.org.
refactoring and simplification. No need for ids and versions in Migrations/MetaData. A hash can be auto-generated.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c8df9a41
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c8df9a41
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c8df9a41

Branch: refs/heads/master
Commit: c8df9a414b9a035f45946460ff7e1adff7fd65d4
Parents: 4f12abc
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:00:27 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:00:27 2017 -0500

----------------------------------------------------------------------
 .../imps/CuratorMultiTransactionRecord.java     | 11 +++
 .../framework/imps/ExtractingCuratorOp.java     |  8 +-
 .../x/async/modeled/migrations/MetaData.java    | 74 +-----------------
 .../x/async/modeled/migrations/Migration.java   | 49 ++----------
 .../modeled/migrations/MigrationManager.java    | 81 +++++++++++++-------
 .../src/site/confluence/index.confluence        |  1 +
 .../migrations/TestMigrationManager.java        | 14 ++--
 7 files changed, 87 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 0611df6..3e72609 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.api.transaction.OperationType;
 import org.apache.curator.framework.api.transaction.TypeAndPath;
 import org.apache.zookeeper.MultiTransactionRecord;
 import org.apache.zookeeper.Op;
+import java.security.MessageDigest;
 import java.util.List;
 
 class CuratorMultiTransactionRecord extends MultiTransactionRecord
@@ -50,4 +51,14 @@ class CuratorMultiTransactionRecord extends MultiTransactionRecord
     {
         return metadata.size();
     }
+
+    void addToDigest(MessageDigest digest)
+    {
+        for ( Op op : this )
+        {
+            digest.update(op.getPath().getBytes());
+            digest.update(Integer.toString(op.getType()).getBytes());
+            digest.update(op.toRequestRecord().toString().getBytes());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
index 7a5db69..58a1572 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
@@ -22,8 +22,9 @@ import com.google.common.base.Preconditions;
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.framework.api.transaction.TypeAndPath;
 import org.apache.zookeeper.Op;
+import java.security.MessageDigest;
 
-class ExtractingCuratorOp implements CuratorOp
+public class ExtractingCuratorOp implements CuratorOp
 {
     private final CuratorMultiTransactionRecord record = new CuratorMultiTransactionRecord();
 
@@ -46,6 +47,11 @@ class ExtractingCuratorOp implements CuratorOp
         return record.iterator().next();
     }
 
+    public void addToDigest(MessageDigest digest)
+    {
+
+    }
+
     private void validate()
     {
         Preconditions.checkArgument(record.size() > 0, "No operation has been added");

http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
index da40a5b..8377967 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
@@ -18,76 +18,8 @@
  */
 package org.apache.curator.x.async.modeled.migrations;
 
-import java.util.Objects;
-
-/**
- * The meta data of a single migration
- */
-public class MetaData
+@FunctionalInterface
+public interface MetaData
 {
-    private final String migrationId;
-    private final int migrationVersion;
-
-    public MetaData()
-    {
-        this("", 0);
-    }
-
-    public MetaData(String migrationId, int migrationVersion)
-    {
-        this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
-        this.migrationVersion = migrationVersion;
-    }
-
-    /**
-     * @return The ID of the migration that was applied
-     */
-    public String getMigrationId()
-    {
-        return migrationId;
-    }
-
-    /**
-     * @return the version of the migration that was applied
-     */
-    public int getMigrationVersion()
-    {
-        return migrationVersion;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if ( this == o )
-        {
-            return true;
-        }
-        if ( o == null || getClass() != o.getClass() )
-        {
-            return false;
-        }
-
-        MetaData metaData = (MetaData)o;
-
-        //noinspection SimplifiableIfStatement
-        if ( migrationVersion != metaData.migrationVersion )
-        {
-            return false;
-        }
-        return migrationId.equals(metaData.migrationId);
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = migrationId.hashCode();
-        result = 31 * result + migrationVersion;
-        return result;
-    }
-
-    @Override
-    public String toString()
-    {
-        return "MetaData{" + "migrationId='" + migrationId + '\'' + ", migrationVersion=" + migrationVersion + '}';
-    }
+    byte[] operationHash();
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index b456580..972c59e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -20,57 +20,18 @@ package org.apache.curator.x.async.modeled.migrations;
 
 import org.apache.curator.framework.api.transaction.CuratorOp;
 import java.util.List;
-import java.util.Objects;
-import java.util.function.Supplier;
 
 /**
  * Models a single migration/transition
  */
+@FunctionalInterface
 public interface Migration
 {
     /**
-     * @return the unique ID for this migration
-     */
-    String id();
-
-    /**
-     * @return the version of this migration
-     */
-    int version();
-
-    /**
-     * @return the operations to execute in a transaction
+     * Return the operations to execute in a transaction. IMPORTANT: during a migration
+     * this method may be called multiple times.
+     *
+     * @return operations
      */
     List<CuratorOp> operations();
-
-    static Migration build(String id, Supplier<List<CuratorOp>> operationsProc)
-    {
-        return build(id, 1, operationsProc);
-    }
-
-    static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
-    {
-        Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(operationsProc, "operationsProc cannot be null");
-        return new Migration()
-        {
-            @Override
-            public String id()
-            {
-                return id;
-            }
-
-            @Override
-            public int version()
-            {
-                return version;
-            }
-
-            @Override
-            public List<CuratorOp> operations()
-            {
-                return operationsProc.get();
-            }
-        };
-    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 01de2f8..d6d37de 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async.modeled.migrations;
 
 import com.google.common.base.Throwables;
 import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.imps.ExtractingCuratorOp;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
 import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -29,8 +30,11 @@ import org.apache.curator.x.async.modeled.ModeledFramework;
 import org.apache.curator.x.async.modeled.ZNode;
 import org.apache.curator.x.async.modeled.ZPath;
 import org.apache.zookeeper.CreateMode;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Objects;
@@ -49,27 +53,21 @@ public class MigrationManager
 {
     private final AsyncCuratorFramework client;
     private final ZPath lockPath;
-    private final ModelSerializer<MetaData> metaDataSerializer;
     private final Executor executor;
     private final Duration lockMax;
 
     private static final String META_DATA_NODE_NAME = "meta-";
 
     /**
-     * Jackson usage: See the note in {@link org.apache.curator.x.async.modeled.JacksonModelSerializer} regarding how the Jackson library is specified in Curator's Maven file.
-     * Unless you are not using Jackson pass <code>JacksonModelSerializer.build(MetaData.class)</code> for <code>metaDataSerializer</code>
-     *
      * @param client the curator client
      * @param lockPath base path for locks used by the manager
-     * @param metaDataSerializer JacksonModelSerializer.build(MetaData.class)
      * @param executor the executor to use
      * @param lockMax max time to wait for locks
      */
-    public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
+    public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, Executor executor, Duration lockMax)
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");
         this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
-        this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
         this.executor = Objects.requireNonNull(executor, "executor cannot be null");
         this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
     }
@@ -90,18 +88,6 @@ public class MigrationManager
     }
 
     /**
-     * Utility to return the meta data from previous migrations
-     *
-     * @param set the set
-     * @return stage
-     */
-    public CompletionStage<List<MetaData>> metaData(MigrationSet set)
-    {
-        ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
-        return ZNode.models(modeled.childrenAsZNodes());
-    }
-
-    /**
      * Can be overridden to change how the comparison to previous migrations is done. The default
      * version ensures that the meta data from previous migrations matches the current migration
      * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
@@ -115,21 +101,46 @@ public class MigrationManager
     {
         if ( sortedMetaData.size() > set.migrations().size() )
         {
-            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
+            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
         }
 
         int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
-        List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
-            .stream()
-            .map(m -> new MetaData(m.id(), m.version()))
-            .collect(Collectors.toList());
-        if ( !compareMigrations.equals(sortedMetaData) )
+        List<Migration> subList = set.migrations().subList(0, compareSize);
+        for ( int i = 0; i < compareSize; ++i )
         {
-            throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
+            byte[] setHash = hash(set.migrations().get(i).operations()).operationHash();
+            if ( !Arrays.equals(setHash, sortedMetaData.get(i).operationHash()) )
+            {
+                throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
+            }
         }
         return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
     }
 
+    private MetaData hash(List<CuratorOp> operations)
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("SHA-256");
+        }
+        catch ( NoSuchAlgorithmException e )
+        {
+            throw new RuntimeException(e);
+        }
+        operations.forEach(op -> {
+            if ( op instanceof ExtractingCuratorOp )
+            {
+                ((ExtractingCuratorOp)op).addToDigest(digest);
+            }
+            else
+            {
+                digest.update(op.toString().getBytes());
+            }
+        });
+        return digest::digest;
+    }
+
     private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
     {
         ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
@@ -148,7 +159,21 @@ public class MigrationManager
 
     private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
     {
-        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+        ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>()
+        {
+            @Override
+            public byte[] serialize(MetaData model)
+            {
+                return model.operationHash();
+            }
+
+            @Override
+            public MetaData deserialize(byte[] bytes)
+            {
+                return () -> bytes;
+            }
+        };
+        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
         return ModeledFramework.wrap(client, modelSpec);
     }
 
@@ -186,7 +211,7 @@ public class MigrationManager
         List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
             List<CuratorOp> operations = new ArrayList<>();
             operations.addAll(migration.operations());
-            MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+            MetaData thisMetaData = hash(operations);
             operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
             return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
         }).collect(Collectors.toList());

http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/index.confluence b/curator-x-async/src/site/confluence/index.confluence
index 4d81d44..74a47b4 100644
--- a/curator-x-async/src/site/confluence/index.confluence
+++ b/curator-x-async/src/site/confluence/index.confluence
@@ -40,6 +40,7 @@ This is a strongly typed DSL that allows you to map a Curator\-style client to:
 * Options for how nodes should be created (sequential, compressed data, ttl, etc.)
 * ACLs for the nodes at the path
 * Options for how to delete nodes (guaranteed, deleting children, etc.)
+* Perform ZooKeeper data migration
 
 For example:
 

http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index 3fe5de2..45aa130 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -109,7 +109,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
 
         executor = Executors.newCachedThreadPool();
-        manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+        manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
     }
 
     @AfterMethod
@@ -124,9 +124,9 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     @Test
     public void testBasic() throws Exception
     {
-        Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
-        Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
-        Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+        Migration m2 = () -> Collections.singletonList(v2op);
+        Migration m3 = () -> Collections.singletonList(v3op);
         MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
 
         complete(manager.migrate(migrationSet));
@@ -142,14 +142,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     @Test
     public void testStaged() throws Exception
     {
-        Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
+        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
         MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1));
         complete(manager.migrate(migrationSet));
 
         ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
         complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
 
-        Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+        Migration m2 = () -> Collections.singletonList(v2op);
         migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2));
         complete(manager.migrate(migrationSet));
 
@@ -159,7 +159,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
             Assert.assertEquals(m.getAge(), 10);
         });
 
-        Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+        Migration m3 = () -> Collections.singletonList(v3op);
         migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
         complete(manager.migrate(migrationSet));
 


[13/23] curator git commit: removed some dead code

Posted by ra...@apache.org.
removed some dead code


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/72ff5a9f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/72ff5a9f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/72ff5a9f

Branch: refs/heads/master
Commit: 72ff5a9fa4c8b0db85dd4e017009143147ac9446
Parents: c40a383
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 14:11:04 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 14:11:04 2017 -0500

----------------------------------------------------------------------
 .../async/migrations/TestMigrationManager.java  | 37 ++------------------
 1 file changed, 3 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/72ff5a9f/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 637aca1..19740d6 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.curator.x.async.migrations;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -26,24 +25,22 @@ import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.AsyncCuratorFramework;
 import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.migrations.models.ModelV1;
+import org.apache.curator.x.async.migrations.models.ModelV2;
+import org.apache.curator.x.async.migrations.models.ModelV3;
 import org.apache.curator.x.async.modeled.JacksonModelSerializer;
 import org.apache.curator.x.async.modeled.ModelSpec;
 import org.apache.curator.x.async.modeled.ModeledFramework;
 import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.curator.x.async.migrations.models.ModelV1;
-import org.apache.curator.x.async.migrations.models.ModelV2;
-import org.apache.curator.x.async.migrations.models.ModelV3;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import java.io.IOException;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.function.UnaryOperator;
 
 public class TestMigrationManager extends CompletableBaseClassForTests
 {
@@ -69,34 +66,6 @@ public class TestMigrationManager extends CompletableBaseClassForTests
 
         this.client = AsyncCuratorFramework.wrap(rawClient);
 
-        ObjectMapper mapper = new ObjectMapper();
-        UnaryOperator<byte[]> from1to2 = bytes -> {
-            try
-            {
-                ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
-                ModelV2 v2 = new ModelV2(v1.getName(), 64);
-                return mapper.writeValueAsBytes(v2);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
-        UnaryOperator<byte[]> from2to3 = bytes -> {
-            try
-            {
-                ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
-                String[] nameParts = v2.getName().split("\\s");
-                ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
-                return mapper.writeValueAsBytes(v3);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
         ZPath modelPath = ZPath.parse("/test/it");
 
         v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();


[23/23] curator git commit: Merge branch 'master' into CURATOR-421

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7a60af0d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7a60af0d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7a60af0d

Branch: refs/heads/master
Commit: 7a60af0dddbf1e547ddb7448ce99e14341413ef0
Parents: 69ec4d7 7d4f062
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 18 08:16:05 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 18 08:16:05 2017 -0500

----------------------------------------------------------------------
 .../framework/imps/GetDataBuilderImpl.java      |   5 +
 .../modeled/details/ModeledFrameworkImpl.java   |   2 +-
 .../curator/x/async/TestBasicOperations.java    |  10 ++
 .../x/async/modeled/TestModeledFramework.java   |  34 ++---
 .../discovery/details/TestServiceDiscovery.java | 130 +++++++------------
 5 files changed, 78 insertions(+), 103 deletions(-)
----------------------------------------------------------------------



[12/23] curator git commit: some more refactoring, tests and doc

Posted by ra...@apache.org.
some more refactoring, tests and doc


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c40a3836
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c40a3836
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c40a3836

Branch: refs/heads/master
Commit: c40a3836131ad49279e05a5c3dd6656848c1eb60
Parents: d80651a
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 13:32:05 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 13:32:05 2017 -0500

----------------------------------------------------------------------
 .../x/async/migrations/MigrationManager.java    | 20 ++--
 .../x/async/migrations/MigrationSet.java        | 14 +--
 .../src/site/confluence/index.confluence        |  9 +-
 .../src/site/confluence/migrations.confluence   | 97 ++++++++++++++++++++
 curator-x-async/src/site/site.xml               |  2 +-
 .../async/migrations/TestMigrationManager.java  | 36 +++++++-
 6 files changed, 150 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
index cb3d6ff..676eef6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -50,6 +50,7 @@ public class MigrationManager
 {
     private final AsyncCuratorFramework client;
     private final String lockPath;
+    private final String metaDataPath;
     private final Executor executor;
     private final Duration lockMax;
 
@@ -58,13 +59,15 @@ public class MigrationManager
     /**
      * @param client the curator client
      * @param lockPath base path for locks used by the manager
+     * @param metaDataPath base path to store the meta data
      * @param executor the executor to use
      * @param lockMax max time to wait for locks
      */
-    public MigrationManager(AsyncCuratorFramework client, String lockPath, Executor executor, Duration lockMax)
+    public MigrationManager(AsyncCuratorFramework client, String lockPath, String metaDataPath, Executor executor, Duration lockMax)
     {
         this.client = Objects.requireNonNull(client, "client cannot be null");
         this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+        this.metaDataPath = Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
         this.executor = Objects.requireNonNull(executor, "executor cannot be null");
         this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
     }
@@ -139,8 +142,9 @@ public class MigrationManager
 
     private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
     {
-        return childrenWithData(client, set.metaDataPath())
-            .thenCompose(metaData -> applyMetaData(set, metaData))
+        String thisMetaDataPath = ZKPaths.makePath(metaDataPath, set.id());
+        return childrenWithData(client, thisMetaDataPath)
+            .thenCompose(metaData -> applyMetaData(set, metaData, thisMetaDataPath))
             .handle((v, e) -> {
                 release(lock, true);
                 if ( e != null )
@@ -152,7 +156,7 @@ public class MigrationManager
         );
     }
 
-    private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData)
+    private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData, String thisMetaDataPath)
     {
         List<byte[]> sortedMetaData = metaData.keySet()
             .stream()
@@ -177,13 +181,13 @@ public class MigrationManager
             return CompletableFuture.completedFuture(null);
         }
 
-        return asyncEnsureContainers(client, set.metaDataPath())
-            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied));
+        return asyncEnsureContainers(client, thisMetaDataPath)
+            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath));
     }
 
-    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied)
+    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath)
     {
-        String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), META_DATA_NODE_NAME);
+        String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME);
         List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
             List<CuratorOp> operations = new ArrayList<>();
             operations.addAll(migration.operations());

http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
index 089d3d8..94b5205 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
@@ -34,19 +34,13 @@ public interface MigrationSet
     String id();
 
     /**
-     * @return where to store the meta data for this migration set
-     */
-    String metaDataPath();
-
-    /**
      * @return list of migrations in the order that they should be applied
      */
     List<Migration> migrations();
 
-    static MigrationSet build(String id, String metaDataPath, List<Migration> migrations)
+    static MigrationSet build(String id, List<Migration> migrations)
     {
         Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
         final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
         return new MigrationSet()
         {
@@ -57,12 +51,6 @@ public interface MigrationSet
             }
 
             @Override
-            public String metaDataPath()
-            {
-                return metaDataPath;
-            }
-
-            @Override
             public List<Migration> migrations()
             {
                 return migrationsCopy;

http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/index.confluence b/curator-x-async/src/site/confluence/index.confluence
index 74a47b4..64f3786 100644
--- a/curator-x-async/src/site/confluence/index.confluence
+++ b/curator-x-async/src/site/confluence/index.confluence
@@ -40,7 +40,6 @@ This is a strongly typed DSL that allows you to map a Curator\-style client to:
 * Options for how nodes should be created (sequential, compressed data, ttl, etc.)
 * ACLs for the nodes at the path
 * Options for how to delete nodes (guaranteed, deleting children, etc.)
-* Perform ZooKeeper data migration
 
 For example:
 
@@ -50,3 +49,11 @@ modeled.set(new Foo());
 {code}
 
 See [[Modeled Curator|modeled.html]] for details.
+
+h2. [[Migrations|migrations.html]]
+
+Curator Migrations allow you pre\-apply transactions in a staged manner so that you
+can ensure a consistent state for parts of your ZooKeeper node hierarchy in a manner
+similar to database migration utilities.
+
+See [[Migrations|migrations.html]] for details.

http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/confluence/migrations.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence
new file mode 100644
index 0000000..4775fac
--- /dev/null
+++ b/curator-x-async/src/site/confluence/migrations.confluence
@@ -0,0 +1,97 @@
+h1. Migrations
+
+Curator Migrations allow you pre\-apply transactions in a staged manner so that you
+can ensure a consistent state for parts of your ZooKeeper node hierarchy in a manner
+similar to database migration utilities.
+
+h2. Background and Usage
+
+Note: To use Migrations, you should be familiar with Java 8's lambdas, CompletedFuture and CompletionStage.
+
+A "migration" is a set of operations to be performed in a transaction. A "migration set" is a list
+of migrations. Combined, this can be used to ensure an initial state for your ZooKeeper nodes as
+well as supporting upgrading/modifying existing state.
+
+For example, given a brand new ZooKeeper instance you might want to populate a few nodes
+and data. E.g.
+
+{code}
+CuratorOp op1 = client.transactionOp().create().forPath("/parent");
+CuratorOp op2 = client.transactionOp().create().forPath("/parent/one");
+CuratorOp op3 = client.transactionOp().create().forPath("/parent/two");
+CuratorOp op4 = client.transactionOp().create().forPath("/parent/three");
+CuratorOp op5 = client.transactionOp().create().forPath("/main", someData);
+{code}
+
+All 5 of these operations would be combined into a migration and set:
+
+{code}
+Migration migration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+MigrationSet set = MigrationSet.build("main", Collections.singletonList(migration));
+{code}
+
+This set can then be passed to a {{MigrationManager}} for processing. The MigrationManager
+checks to see if the migration has been applied already and, if not, processes the transaction.
+
+At a future date, the migration set could be expanded to update/modify things. E.g.
+
+{code}
+CuratorOp newOp1 = client.transactionOp().create().forPath("/new");
+CuratorOp newOp2 = client.transactionOp().delete().forPath("/main");    // maybe this is no longer needed
+{code}
+
+This would be combined with the previous migration:
+
+{code}
+Migration initialMigration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+Migration newMigration = () -> Arrays.asList(newOp1, newOp2);
+MigrationSet set = MigrationSet.build("main", Arrays.asList(initialMigration, newMigration));
+{code}
+
+When this set is run, the MigrationManager will perform both migration operations on new
+ZooKeeper databases but only the second "newMigration" on ZK databases that already have
+the first migration applied.
+
+h2. Details/Reference
+
+_Migration_
+
+A Migration is a wrapper around a list of operations that constitute one stage in a migration
+set and are applied as a single transaction.
+
+_MigrationSet_
+
+A MigrationSet is an ordered list of Migrations. Curator keeps track of which migrations in a
+set have been previously applied and only processes un\-applied migrations. Each migration
+set must have a unique identifier. Create a MigrationSet via its builder:
+
+{code}
+MigrationSet set = MigrationSet.build(migrationId, migrations);
+{code}
+
+_MigrationManager_
+
+The MigrationManager processes MigrationSets. Usually, you'd run this only on new ZooKeeper
+databases or as part of a maintenance operation to update the ZooKeeper database. E.g.
+
+{code}
+MigrationManager manager = new MigrationManager(client,
+    lockPath,       // base path for locks used by the manager
+    metaDataPath,   // base path to store the meta data
+    executor,       // the executor to use
+    lockMax         // max time to wait for locks
+);
+manager.migrate(set).exceptionally(e -> {
+    if ( e instanceof MigrationException ) {
+        // migration checksum failed, etc.
+    } else {
+        // some other kind of error
+    }
+    return null;
+});
+{code}
+
+Each migration in the set is applied in a transaction. MigrationManager stores a hash
+of all operations in a migration so that it can be compared for future operations. i.e.
+if, in the future, a migration set is attempted but the hash of one of the previous migrations
+does not match, the stage completes exceptionally with {{MigrationException}}.

http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/site.xml
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/site.xml b/curator-x-async/src/site/site.xml
index f78abc7..fc0a67a 100644
--- a/curator-x-async/src/site/site.xml
+++ b/curator-x-async/src/site/site.xml
@@ -25,7 +25,7 @@
             <link rel="stylesheet" href="../css/site.css" />
             <script type="text/javascript">
                 $(function(){
-                    if ( location &amp;&amp; location.pathname &amp;&amp; location.pathname.endsWith('/index.html') ) {
+                    if ( location &amp;&amp; location.pathname &amp;&amp; (location.pathname.endsWith('/index.html') || location.pathname.endsWith('/migrations.html')) ) {
                         $('a[title="Java 8/Async"]').parent().addClass("active");
                     } else {
                         $('a[title="Strongly Typed Models"]').parent().addClass("active");

http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 42bc76d..637aca1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -109,7 +109,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
 
         executor = Executors.newCachedThreadPool();
-        manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
+        manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
     }
 
     @AfterMethod
@@ -127,7 +127,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         Migration m1 = () -> Arrays.asList(v1opA, v1opB);
         Migration m2 = () -> Collections.singletonList(v2op);
         Migration m3 = () -> Collections.singletonList(v3op);
-        MigrationSet migrationSet = MigrationSet.build("1", "/metadata", Arrays.asList(m1, m2, m3));
+        MigrationSet migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2, m3));
 
         complete(manager.migrate(migrationSet));
 
@@ -143,14 +143,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     public void testStaged() throws Exception
     {
         Migration m1 = () -> Arrays.asList(v1opA, v1opB);
-        MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", Collections.singletonList(m1));
+        MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(m1));
         complete(manager.migrate(migrationSet));
 
         ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
         complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
 
         Migration m2 = () -> Collections.singletonList(v2op);
-        migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2));
+        migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2));
         complete(manager.migrate(migrationSet));
 
         ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
@@ -160,7 +160,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         });
 
         Migration m3 = () -> Collections.singletonList(v3op);
-        migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2, m3));
+        migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2, m3));
         complete(manager.migrate(migrationSet));
 
         ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
@@ -170,4 +170,30 @@ public class TestMigrationManager extends CompletableBaseClassForTests
             Assert.assertEquals(m.getLastName(), "Two");
         });
     }
+
+    @Test
+    public void testDocExample() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/parent");
+        CuratorOp op2 = client.transactionOp().create().forPath("/parent/one");
+        CuratorOp op3 = client.transactionOp().create().forPath("/parent/two");
+        CuratorOp op4 = client.transactionOp().create().forPath("/parent/three");
+        CuratorOp op5 = client.transactionOp().create().forPath("/main", "hey".getBytes());
+
+        Migration initialMigration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+        MigrationSet migrationSet = MigrationSet.build("main", Collections.singletonList(initialMigration));
+        complete(manager.migrate(migrationSet));
+
+        Assert.assertNotNull(client.unwrap().checkExists().forPath("/parent/three"));
+        Assert.assertEquals(client.unwrap().getData().forPath("/main"), "hey".getBytes());
+
+        CuratorOp newOp1 = client.transactionOp().create().forPath("/new");
+        CuratorOp newOp2 = client.transactionOp().delete().forPath("/main");    // maybe this is no longer needed
+
+        Migration newMigration = () -> Arrays.asList(newOp1, newOp2);
+        migrationSet = MigrationSet.build("main", Arrays.asList(initialMigration, newMigration));
+        complete(manager.migrate(migrationSet));
+
+        Assert.assertNull(client.unwrap().checkExists().forPath("/main"));
+    }
 }


[03/23] curator git commit: no longer used

Posted by ra...@apache.org.
no longer used


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a0c1620
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a0c1620
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a0c1620

Branch: refs/heads/master
Commit: 1a0c1620ef3b9bbac0f41dddf153ae7e83aed8c0
Parents: 1784a7c
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 08:07:36 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 08:07:36 2017 -0500

----------------------------------------------------------------------
 .../x/async/modeled/migrations/Result.java      | 23 --------------------
 1 file changed, 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/1a0c1620/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
deleted file mode 100644
index 4fcfeac..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-public interface Result
-{
-}


[07/23] curator git commit: Merge branch 'master' into CURATOR-421

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ecbd3866
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ecbd3866
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ecbd3866

Branch: refs/heads/master
Commit: ecbd386645e015a72929ce37f33772c7b5efb3d1
Parents: bc7bf4a 11be719
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 10:09:31 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 10:09:31 2017 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------



[17/23] curator git commit: more tests

Posted by ra...@apache.org.
more tests


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9385d049
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9385d049
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9385d049

Branch: refs/heads/master
Commit: 9385d0490d8c684a602b4c57489e39941a9a178b
Parents: 75118e4
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 17:34:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 17:34:00 2017 -0500

----------------------------------------------------------------------
 .../async/migrations/TestMigrationManager.java  | 88 +++++++++++++++++++-
 1 file changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9385d049/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 80a03bb..786e704 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.api.transaction.CuratorOp;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncWrappers;
 import org.apache.curator.x.async.CompletableBaseClassForTests;
 import org.apache.curator.x.async.migrations.models.ModelV1;
 import org.apache.curator.x.async.migrations.models.ModelV2;
@@ -41,12 +42,20 @@ import org.testng.annotations.Test;
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 public class TestMigrationManager extends CompletableBaseClassForTests
 {
+    private static final String LOCK_PATH = "/migrations/locks";
+    private static final String META_DATA_PATH = "/migrations/metadata";
     private AsyncCuratorFramework client;
     private ModelSpec<ModelV1> v1Spec;
     private ModelSpec<ModelV2> v2Spec;
@@ -57,6 +66,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     private CuratorOp v2op;
     private CuratorOp v3op;
     private MigrationManager manager;
+    private final AtomicReference<CountDownLatch> filterLatch = new AtomicReference<>();
 
     @BeforeMethod
     @Override
@@ -81,7 +91,27 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
 
         executor = Executors.newCachedThreadPool();
-        manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
+        manager = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMinutes(10))
+        {
+            @Override
+            protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException
+            {
+                CountDownLatch localLatch = filterLatch.getAndSet(null);
+                if ( localLatch != null )
+                {
+                    try
+                    {
+                        localLatch.await();
+                    }
+                    catch ( InterruptedException e )
+                    {
+                        Thread.currentThread().interrupt();
+                        Throwables.propagate(e);
+                    }
+                }
+                return super.filter(set, operationHashesInOrder);
+            }
+        };
         manager.debugCount = new AtomicInteger();
     }
 
@@ -260,4 +290,60 @@ public class TestMigrationManager extends CompletableBaseClassForTests
 
         Assert.assertNull(client.unwrap().checkExists().forPath("/test"));
     }
+
+    @Test
+    public void testConcurrency1() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/test");
+        CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+        Migration migration = () -> Arrays.asList(op1, op2);
+        MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        CountDownLatch latch = new CountDownLatch(1);
+        filterLatch.set(latch);
+        CompletionStage<Void> first = manager.migrate(migrationSet);
+
+        MigrationManager manager2 = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMillis(timing.forSleepingABit().milliseconds()));
+        try
+        {
+            complete(manager2.migrate(migrationSet));
+            Assert.fail("Should throw");
+        }
+        catch ( Throwable e )
+        {
+            Assert.assertTrue(Throwables.getRootCause(e) instanceof AsyncWrappers.TimeoutException);
+        }
+
+        latch.countDown();
+        complete(first);
+        Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes());
+    }
+
+    @Test
+    public void testConcurrency2() throws Exception
+    {
+        CuratorOp op1 = client.transactionOp().create().forPath("/test");
+        CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+        Migration migration = () -> Arrays.asList(op1, op2);
+        MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+        CountDownLatch latch = new CountDownLatch(1);
+        filterLatch.set(latch);
+        CompletionStage<Void> first = manager.migrate(migrationSet);
+
+        CompletionStage<Void> second = manager.migrate(migrationSet);
+        try
+        {
+            second.toCompletableFuture().get(timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS);
+            Assert.fail("Should throw");
+        }
+        catch ( Throwable e )
+        {
+            Assert.assertTrue(Throwables.getRootCause(e) instanceof TimeoutException);
+        }
+
+        latch.countDown();
+        complete(first);
+        Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes());
+        complete(second);
+        Assert.assertEquals(manager.debugCount.get(), 1);
+    }
 }


[10/23] curator git commit: Merge branch 'master' into CURATOR-421

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2ab172a4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2ab172a4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2ab172a4

Branch: refs/heads/master
Commit: 2ab172a45722efb08e85b6b4d230d9d04b4ce15b
Parents: c8df9a4 1b6216e
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:05:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:05:00 2017 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/curator/x/async/AsyncWrappers.java    | 6 ++++--
 .../java/org/apache/curator/x/async/TestBasicOperations.java   | 3 +--
 2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------



[06/23] curator git commit: more testing

Posted by ra...@apache.org.
more testing


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bc7bf4ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bc7bf4ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bc7bf4ac

Branch: refs/heads/master
Commit: bc7bf4ace6dcc186352f1c3731e1cae69cef2e1f
Parents: c3adc95
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 10:08:04 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 10:08:04 2017 -0500

----------------------------------------------------------------------
 .../modeled/migrations/MigrationManager.java    |  4 +-
 .../migrations/TestMigrationManager.java        | 58 ++++++++++++++++----
 2 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/bc7bf4ac/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index e59b7bf..bfb9707 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -79,7 +79,7 @@ public class MigrationManager
     {
         if ( sortedMetaData.size() > set.migrations().size() )
         {
-            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+            throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
         }
 
         int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
@@ -89,7 +89,7 @@ public class MigrationManager
             .collect(Collectors.toList());
         if ( !compareMigrations.equals(sortedMetaData) )
         {
-            throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+            throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
         }
         return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/bc7bf4ac/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index 9fcd53c..3fe5de2 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -48,11 +48,15 @@ import java.util.function.UnaryOperator;
 public class TestMigrationManager extends CompletableBaseClassForTests
 {
     private AsyncCuratorFramework client;
-    private MigrationSet migrationSet;
     private ModelSpec<ModelV1> v1Spec;
     private ModelSpec<ModelV2> v2Spec;
     private ModelSpec<ModelV3> v3Spec;
     private ExecutorService executor;
+    private CuratorOp v1opA;
+    private CuratorOp v1opB;
+    private CuratorOp v2op;
+    private CuratorOp v3op;
+    private MigrationManager manager;
 
     @BeforeMethod
     @Override
@@ -99,17 +103,13 @@ public class TestMigrationManager extends CompletableBaseClassForTests
         v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
         v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
 
-        CuratorOp v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
-        CuratorOp v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
-        CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
-        CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
-
-        Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
-        Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
-        Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
-        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
+        v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
+        v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+        v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+        v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
 
         executor = Executors.newCachedThreadPool();
+        manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
     }
 
     @AfterMethod
@@ -124,7 +124,43 @@ public class TestMigrationManager extends CompletableBaseClassForTests
     @Test
     public void testBasic() throws Exception
     {
-        MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+        Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
+        Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+        Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+        MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
+
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+        complete(v3Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 30);
+            Assert.assertEquals(m.getFirstName(), "One");
+            Assert.assertEquals(m.getLastName(), "Two");
+        });
+    }
+
+    @Test
+    public void testStaged() throws Exception
+    {
+        Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
+        MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
+        complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
+
+        Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
+        complete(v2Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getName(), "Test 2");
+            Assert.assertEquals(m.getAge(), 10);
+        });
+
+        Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
         complete(manager.migrate(migrationSet));
 
         ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);


[19/23] curator git commit: Merge branch 'master' into CURATOR-421

Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/84191f3d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/84191f3d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/84191f3d

Branch: refs/heads/master
Commit: 84191f3d46d015cb49ebebb42fddc1019b3db6c3
Parents: 9d30a8c 123f2ec
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:03:53 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:03:53 2017 -0500

----------------------------------------------------------------------

----------------------------------------------------------------------