You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/18 13:16:23 UTC
[01/23] curator git commit: initial work on migrations done. Needs
lots of testing and doc. However,
found bug CURATOR-423 and will need to fix that first
Repository: curator
Updated Branches:
refs/heads/master 7d4f06238 -> 7a60af0dd
initial work on migrations done. Needs lots of testing and doc. However, found bug CURATOR-423 and will need to fix that first
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4cd731c9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4cd731c9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4cd731c9
Branch: refs/heads/master
Commit: 4cd731c92d34a075d9d5a9610ec2610d69aa7792
Parents: 716fb4a
Author: randgalt <ra...@apache.org>
Authored: Thu Jul 13 23:44:59 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jul 13 23:44:59 2017 -0500
----------------------------------------------------------------------
.../apache/curator/x/async/AsyncWrappers.java | 38 ++--
.../x/async/modeled/ModelSerializer.java | 18 ++
.../async/modeled/ModeledFrameworkBuilder.java | 20 +-
.../curator/x/async/modeled/ModeledOptions.java | 29 +++
.../modeled/details/ModeledFrameworkImpl.java | 33 +++-
.../InvalidMigrationSetException.java | 37 ++++
.../x/async/modeled/migrations/MetaData.java | 84 +++++++++
.../x/async/modeled/migrations/Migration.java | 57 ++++++
.../modeled/migrations/MigrationManager.java | 37 ++++
.../migrations/MigrationManagerBuilder.java | 68 +++++++
.../migrations/MigrationManagerImpl.java | 181 +++++++++++++++++++
.../async/modeled/migrations/MigrationSet.java | 69 +++++++
.../x/async/modeled/migrations/Result.java | 23 +++
.../x/async/CompletableBaseClassForTests.java | 8 +-
.../migrations/TestMigrationManager.java | 129 +++++++++++++
.../modeled/migrations/models/ModelV1.java | 39 ++++
.../modeled/migrations/models/ModelV2.java | 46 +++++
.../modeled/migrations/models/ModelV3.java | 53 ++++++
18 files changed, 934 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index e982cf2..7da82fc 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -20,7 +20,10 @@ package org.apache.curator.x.async;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.utils.ThreadUtils;
+import org.apache.curator.x.async.api.ExistsOption;
import org.apache.curator.x.async.modeled.ZPath;
+import java.util.Collections;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
@@ -67,38 +70,21 @@ public class AsyncWrappers
{
/**
* Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
- * the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
- *
- * @param client client
- * @param path path to ensure
- * @return stage
- */
- public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
- {
- return asyncEnsureContainers(client, path, null);
- }
-
- /**
- * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
* the given executor
*
* @param client client
* @param path path to ensure
* @return stage
*/
- public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path, Executor executor)
+ public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, ZPath path)
{
- Runnable proc = () -> {
- try
- {
- client.unwrap().createContainers(path.fullPath());
- }
- catch ( Exception e )
- {
- throw new RuntimeException(e);
- }
- };
- return (executor != null) ? CompletableFuture.runAsync(proc, executor) : CompletableFuture.runAsync(proc);
+ Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
+ return client
+ .checkExists()
+ .withOptions(options)
+ .forPath(path.child("foo").fullPath())
+ .thenApply(__ -> null)
+ ;
}
/**
@@ -284,7 +270,7 @@ public class AsyncWrappers
future.complete(null);
}
}
- catch ( Exception e )
+ catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
future.completeExceptionally(e);
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
index 428096e..476f314 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModelSerializer.java
@@ -40,4 +40,22 @@ public interface ModelSerializer<T>
* @throws RuntimeException if <code>bytes</code> is invalid or there was an error deserializing
*/
T deserialize(byte[] bytes);
+
+ /**
+ * A pass through serializer
+ */
+ ModelSerializer<byte[]> raw = new ModelSerializer<byte[]>()
+ {
+ @Override
+ public byte[] serialize(byte[] model)
+ {
+ return model;
+ }
+
+ @Override
+ public byte[] deserialize(byte[] bytes)
+ {
+ return bytes;
+ }
+ };
}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
index 2e8bec3..1df68e6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledFrameworkBuilder.java
@@ -18,13 +18,16 @@
*/
package org.apache.curator.x.async.modeled;
+import com.google.common.collect.ImmutableSet;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.WatchMode;
import org.apache.curator.x.async.modeled.details.ModeledFrameworkImpl;
import org.apache.zookeeper.WatchedEvent;
+import java.util.Collections;
import java.util.Objects;
+import java.util.Set;
import java.util.function.UnaryOperator;
public class ModeledFrameworkBuilder<T>
@@ -35,6 +38,7 @@ public class ModeledFrameworkBuilder<T>
private UnaryOperator<WatchedEvent> watcherFilter;
private UnhandledErrorListener unhandledErrorListener;
private UnaryOperator<CuratorEvent> resultFilter;
+ private Set<ModeledOptions> modeledOptions;
/**
* Build a new ModeledFramework instance
@@ -49,7 +53,8 @@ public class ModeledFrameworkBuilder<T>
watchMode,
watcherFilter,
unhandledErrorListener,
- resultFilter
+ resultFilter,
+ modeledOptions
);
}
@@ -142,6 +147,18 @@ public class ModeledFrameworkBuilder<T>
return this;
}
+ /**
+ * Change the modeled options
+ *
+ * @param modeledOptions new options set
+ * @return this for chaining
+ */
+ public ModeledFrameworkBuilder<T> withOptions(Set<ModeledOptions> modeledOptions)
+ {
+ this.modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "client cannot be null"));
+ return this;
+ }
+
ModeledFrameworkBuilder()
{
}
@@ -150,5 +167,6 @@ public class ModeledFrameworkBuilder<T>
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.modelSpec = Objects.requireNonNull(modelSpec, "modelSpec cannot be null");
+ modeledOptions = Collections.singleton(ModeledOptions.ignoreMissingNodesForChildren);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
new file mode 100644
index 0000000..434894b
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/ModeledOptions.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled;
+
+public enum ModeledOptions
+{
+ /**
+ * Causes {@link ModeledFramework#children()} and {@link ModeledFramework#childrenAsZNodes()}
+ * to ignore {@link org.apache.zookeeper.KeeperException.NoNodeException} and merely return
+ * an empty list
+ */
+ ignoreMissingNodesForChildren
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
index c1d19c4..44011ee 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java
@@ -19,6 +19,8 @@
package org.apache.curator.x.async.modeled.details;
import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
@@ -36,13 +38,16 @@ import org.apache.curator.x.async.api.CreateOption;
import org.apache.curator.x.async.api.WatchableAsyncCuratorFramework;
import org.apache.curator.x.async.modeled.ModelSpec;
import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ModeledOptions;
import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.curator.x.async.modeled.cached.CachedModeledFramework;
import org.apache.curator.x.async.modeled.versioned.VersionedModeledFramework;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
@@ -62,13 +67,15 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
private final UnaryOperator<CuratorEvent> resultFilter;
private final AsyncCuratorFrameworkDsl dslClient;
private final boolean isWatched;
+ private final Set<ModeledOptions> modeledOptions;
- public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter)
+ public static <T> ModeledFrameworkImpl<T> build(AsyncCuratorFramework client, ModelSpec<T> model, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, Set<ModeledOptions> modeledOptions)
{
boolean isWatched = (watchMode != null);
Objects.requireNonNull(client, "client cannot be null");
Objects.requireNonNull(model, "model cannot be null");
+ modeledOptions = ImmutableSet.copyOf(Objects.requireNonNull(modeledOptions, "modeledOptions cannot be null"));
watchMode = (watchMode != null) ? watchMode : WatchMode.stateChangeAndSuccess;
@@ -84,11 +91,12 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
watcherFilter,
unhandledErrorListener,
resultFilter,
- isWatched
+ isWatched,
+ modeledOptions
);
}
- private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched)
+ private ModeledFrameworkImpl(AsyncCuratorFramework client, AsyncCuratorFrameworkDsl dslClient, WatchableAsyncCuratorFramework watchableClient, ModelSpec<T> modelSpec, WatchMode watchMode, UnaryOperator<WatchedEvent> watcherFilter, UnhandledErrorListener unhandledErrorListener, UnaryOperator<CuratorEvent> resultFilter, boolean isWatched, Set<ModeledOptions> modeledOptions)
{
this.client = client;
this.dslClient = dslClient;
@@ -99,6 +107,7 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
this.unhandledErrorListener = unhandledErrorListener;
this.resultFilter = resultFilter;
this.isWatched = isWatched;
+ this.modeledOptions = modeledOptions;
}
@Override
@@ -280,7 +289,14 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
asyncStage.whenComplete((children, e) -> {
if ( e != null )
{
- modelStage.completeExceptionally(e);
+ if ( modeledOptions.contains(ModeledOptions.ignoreMissingNodesForChildren) && (Throwables.getRootCause(e) instanceof KeeperException.NoNodeException) )
+ {
+ modelStage.complete(Collections.emptyList());
+ }
+ else
+ {
+ modelStage.completeExceptionally(e);
+ }
}
else
{
@@ -303,7 +319,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
watcherFilter,
unhandledErrorListener,
resultFilter,
- isWatched
+ isWatched,
+ modeledOptions
);
}
@@ -320,7 +337,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
watcherFilter,
unhandledErrorListener,
resultFilter,
- isWatched
+ isWatched,
+ modeledOptions
);
}
@@ -337,7 +355,8 @@ public class ModeledFrameworkImpl<T> implements ModeledFramework<T>
watcherFilter,
unhandledErrorListener,
resultFilter,
- isWatched
+ isWatched,
+ modeledOptions
);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
new file mode 100644
index 0000000..84b21bf
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+
+public class InvalidMigrationSetException extends RuntimeException
+{
+ private final String migrationId;
+
+ public InvalidMigrationSetException(String migrationId, String message)
+ {
+ super(message);
+ this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+ }
+
+ public String getMigrationId()
+ {
+ return migrationId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
new file mode 100644
index 0000000..c2878e2
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+
+public class MetaData
+{
+ private final String migrationId;
+ private final int migrationVersion;
+
+ public MetaData()
+ {
+ this("", 0);
+ }
+
+ public MetaData(String migrationId, int migrationVersion)
+ {
+ this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+ this.migrationVersion = migrationVersion;
+ }
+
+ public String getMigrationId()
+ {
+ return migrationId;
+ }
+
+ public int getMigrationVersion()
+ {
+ return migrationVersion;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if ( this == o )
+ {
+ return true;
+ }
+ if ( o == null || getClass() != o.getClass() )
+ {
+ return false;
+ }
+
+ MetaData metaData = (MetaData)o;
+
+ //noinspection SimplifiableIfStatement
+ if ( migrationVersion != metaData.migrationVersion )
+ {
+ return false;
+ }
+ return migrationId.equals(metaData.migrationId);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = migrationId.hashCode();
+ result = 31 * result + migrationVersion;
+ return result;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "MetaData{" + "migrationId='" + migrationId + '\'' + ", migrationVersion=" + migrationVersion + '}';
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
new file mode 100644
index 0000000..b3919d1
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+import java.util.function.UnaryOperator;
+
+public interface Migration
+{
+ String id();
+
+ int version();
+
+ byte[] migrate(byte[] previousBytes);
+
+ static Migration build(String id, int version, UnaryOperator<byte[]> migrateProc)
+ {
+ Objects.requireNonNull(id, "id cannot be null");
+ Objects.requireNonNull(migrateProc, "migrateProc cannot be null");
+ return new Migration()
+ {
+ @Override
+ public String id()
+ {
+ return id;
+ }
+
+ @Override
+ public int version()
+ {
+ return version;
+ }
+
+ @Override
+ public byte[] migrate(byte[] previousBytes)
+ {
+ return migrateProc.apply(previousBytes);
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
new file mode 100644
index 0000000..2d5f39f
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+
+public interface MigrationManager
+{
+ CompletionStage<List<MetaData>> metaData(ZPath metaDataPath);
+
+ CompletionStage<Void> run();
+
+ static MigrationManagerBuilder builder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
+ {
+ return new MigrationManagerBuilder(client, lockPath, metaDataSerializer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
new file mode 100644
index 0000000..ed48242
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executor;
+
+public class MigrationManagerBuilder
+{
+ private final AsyncCuratorFramework client;
+ private final ZPath lockPath;
+ private final ModelSerializer<MetaData> metaDataSerializer;
+ private final List<MigrationSet> sets = new ArrayList<>();
+ private Executor executor = Runnable::run;
+ private Duration lockMax = Duration.ofSeconds(15);
+
+ MigrationManagerBuilder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
+ {
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+ this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
+ }
+
+ public MigrationManager build()
+ {
+ return new MigrationManagerImpl(client, lockPath, metaDataSerializer, executor, lockMax, sets);
+ }
+
+ public MigrationManagerBuilder withExecutor(Executor executor)
+ {
+ this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+ return this;
+ }
+
+ public MigrationManagerBuilder withLockMax(Duration lockMax)
+ {
+ this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+ return this;
+ }
+
+ public MigrationManagerBuilder adding(MigrationSet set)
+ {
+ sets.add(Objects.requireNonNull(set, "set cannot be null"));
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
new file mode 100644
index 0000000..15c61b2
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import com.google.common.base.Throwables;
+import com.google.common.collect.ImmutableList;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.CreateMode;
+import java.time.Duration;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+class MigrationManagerImpl implements MigrationManager
+{
+ private final AsyncCuratorFramework client;
+ private final ZPath lockPath;
+ private final ModelSerializer<MetaData> metaDataSerializer;
+ private final Executor executor;
+ private final Duration lockMax;
+ private final List<MigrationSet> sets;
+
+ private static final String META_DATA_NODE_NAME = "meta-";
+
+ MigrationManagerImpl(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax, List<MigrationSet> sets)
+ {
+ this.client = client;
+ this.lockPath = lockPath;
+ this.metaDataSerializer = metaDataSerializer;
+ this.executor = executor;
+ this.lockMax = lockMax;
+ this.sets = ImmutableList.copyOf(sets);
+ }
+
+ @Override
+ public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
+ {
+ ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
+ return ZNode.models(modeled.childrenAsZNodes());
+ }
+
+ @Override
+ public CompletionStage<Void> run()
+ {
+ Map<String, CompletableFuture<Void>> futures = sets
+ .stream()
+ .map(m -> new AbstractMap.SimpleEntry<>(m.id(), runMigration(m).toCompletableFuture()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[futures.size()]));
+ }
+
+ private CompletionStage<Void> runMigration(MigrationSet set)
+ {
+ String lockPath = this.lockPath.child(set.id()).fullPath();
+ InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
+ CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+ return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+ }
+
+ private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+ {
+ ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
+ return modeled.childrenAsZNodes()
+ .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
+ .handle((v, e) -> {
+ release(lock, true);
+ if ( e != null )
+ {
+ Throwables.propagate(e);
+ }
+ return v;
+ }
+ );
+ }
+
+ private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
+ {
+ ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+ return ModeledFramework.wrap(client, modelSpec);
+ }
+
+ protected void checkIsValid(MigrationSet set, List<MetaData> sortedMetaData) throws InvalidMigrationSetException
+ {
+ if ( sortedMetaData.size() > set.migrations().size() )
+ {
+ throw new InvalidMigrationSetException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+ }
+
+ int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
+ List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
+ .stream()
+ .map(m -> new MetaData(m.id(), m.version()))
+ .collect(Collectors.toList());
+ if ( !compareMigrations.equals(sortedMetaData) )
+ {
+ throw new InvalidMigrationSetException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+ }
+ }
+
+ private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
+ {
+ List<MetaData> sortedMetaData = metaDataNodes
+ .stream()
+ .sorted(Comparator.comparing(m -> m.path().fullPath()))
+ .map(ZNode::model)
+ .collect(Collectors.toList());
+ try
+ {
+ checkIsValid(set, sortedMetaData);
+ }
+ catch ( InvalidMigrationSetException e )
+ {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ List<Migration> toBeApplied = set.migrations().subList(sortedMetaData.size(), set.migrations().size());
+ if ( toBeApplied.size() == 0 )
+ {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
+ .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, metaDataClient));
+ }
+
+ private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
+ {
+ ModelSpec<byte[]> modelSpec = ModelSpec.builder(set.path(), ModelSerializer.raw).build();
+ ModeledFramework<byte[]> modeled = ModeledFramework.wrap(client, modelSpec);
+ return modeled.childrenAsZNodes().thenCompose(nodes -> {
+ List<CuratorOp> operations = new ArrayList<>();
+ for ( ZNode<byte[]> node : nodes )
+ {
+ byte[] currentBytes = node.model();
+ for ( Migration migration : toBeApplied )
+ {
+ currentBytes = migration.migrate(currentBytes);
+ MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+ operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
+ }
+ operations.add(modeled.child(node.path().nodeName()).updateOp(currentBytes, node.stat().getVersion()));
+ }
+ return client.transaction().forOperations(operations).thenApply(__ -> null);
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
new file mode 100644
index 0000000..9e41989
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.curator.x.async.modeled.ZPath;
+import java.util.List;
+import java.util.Objects;
+
+public interface MigrationSet
+{
+ String id();
+
+ ZPath path();
+
+ ZPath metaDataPath();
+
+ List<Migration> migrations();
+
+ static MigrationSet build(String id, ZPath path, ZPath metaDataPath, List<Migration> migrations)
+ {
+ Objects.requireNonNull(id, "id cannot be null");
+ Objects.requireNonNull(path, "path cannot be null");
+ Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
+ final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
+ return new MigrationSet()
+ {
+ @Override
+ public String id()
+ {
+ return id;
+ }
+
+ @Override
+ public ZPath path()
+ {
+ return path;
+ }
+
+ @Override
+ public ZPath metaDataPath()
+ {
+ return metaDataPath;
+ }
+
+ @Override
+ public List<Migration> migrations()
+ {
+ return migrationsCopy;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
new file mode 100644
index 0000000..4fcfeac
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+public interface Result
+{
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
index 232d301..4a964b1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/CompletableBaseClassForTests.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.x.async;
+import com.google.common.base.Throwables;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.Timing;
import org.testng.Assert;
@@ -33,7 +34,12 @@ public abstract class CompletableBaseClassForTests extends BaseClassForTests
protected <T, U> void complete(CompletionStage<T> stage)
{
- complete(stage, (v, e) -> {});
+ complete(stage, (v, e) -> {
+ if ( e != null )
+ {
+ Throwables.propagate(e);
+ }
+ });
}
protected <T, U> void complete(CompletionStage<T> stage, BiConsumer<? super T, Throwable> handler)
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
new file mode 100644
index 0000000..d709abe
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.modeled.JacksonModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.modeled.migrations.models.ModelV1;
+import org.apache.curator.x.async.modeled.migrations.models.ModelV2;
+import org.apache.curator.x.async.modeled.migrations.models.ModelV3;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.function.UnaryOperator;
+
+public class TestMigrationManager extends CompletableBaseClassForTests
+{
+ private AsyncCuratorFramework client;
+ private MigrationSet migrationSet;
+ private ModelSpec<ModelV1> v1Spec;
+ private ModelSpec<ModelV2> v2Spec;
+ private ModelSpec<ModelV3> v3Spec;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception
+ {
+ super.setup();
+
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+ client.start();
+
+ this.client = AsyncCuratorFramework.wrap(client);
+
+ ObjectMapper mapper = new ObjectMapper();
+ UnaryOperator<byte[]> from1to2 = bytes -> {
+ try
+ {
+ ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
+ ModelV2 v2 = new ModelV2(v1.getName(), 64);
+ return mapper.writeValueAsBytes(v2);
+ }
+ catch ( IOException e )
+ {
+ throw new RuntimeException(e);
+ }
+ };
+
+ UnaryOperator<byte[]> from2to3 = bytes -> {
+ try
+ {
+ ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
+ String[] nameParts = v2.getName().split("\\s");
+ ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
+ return mapper.writeValueAsBytes(v3);
+ }
+ catch ( IOException e )
+ {
+ throw new RuntimeException(e);
+ }
+ };
+
+ ZPath modelPath = ZPath.parse("/test/it");
+
+ Migration m1 = Migration.build("1",1, from1to2);
+ Migration m2 = Migration.build("2",1, from2to3);
+ migrationSet = MigrationSet.build("1", modelPath, ZPath.parse("/metadata"), Arrays.asList(m1, m2));
+
+ v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
+ v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
+ v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+ }
+
+ @AfterMethod
+ @Override
+ public void teardown() throws Exception
+ {
+ CloseableUtils.closeQuietly(client.unwrap());
+ super.teardown();
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
+ ModelV1 v1 = new ModelV1("John Galt");
+ complete(v1Client.child("1").set(v1));
+
+ MigrationManager manager = MigrationManager.builder(this.client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class))
+ .adding(migrationSet)
+ .build();
+
+ complete(manager.run());
+
+ ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+ complete(v3Client.child("1").read(), (m, e) -> {
+ Assert.assertEquals(m.getAge(), 64);
+ Assert.assertEquals(m.getFirstName(), "John");
+ Assert.assertEquals(m.getLastName(), "Galt");
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
new file mode 100644
index 0000000..02b13b7
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations.models;
+
+public class ModelV1
+{
+ private final String name;
+
+ public ModelV1()
+ {
+ this("");
+ }
+
+ public ModelV1(String name)
+ {
+ this.name = name;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
new file mode 100644
index 0000000..bd77a2e
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations.models;
+
+public class ModelV2
+{
+ private final String name;
+ private final int age;
+
+ public ModelV2()
+ {
+ this("", 0);
+ }
+
+ public ModelV2(String name, int age)
+ {
+ this.name = name;
+ this.age = age;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public int getAge()
+ {
+ return age;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/4cd731c9/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
new file mode 100644
index 0000000..d4713b8
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations.models;
+
+public class ModelV3
+{
+ private final String firstName;
+ private final String lastName;
+ private final int age;
+
+ public ModelV3()
+ {
+ this("", "", 0);
+ }
+
+ public ModelV3(String firstName, String lastName, int age)
+ {
+ this.firstName = firstName;
+ this.lastName = lastName;
+ this.age = age;
+ }
+
+ public String getFirstName()
+ {
+ return firstName;
+ }
+
+ public String getLastName()
+ {
+ return lastName;
+ }
+
+ public int getAge()
+ {
+ return age;
+ }
+}
[16/23] curator git commit: more tests
Posted by ra...@apache.org.
more tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/75118e43
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/75118e43
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/75118e43
Branch: refs/heads/master
Commit: 75118e43b165d2e99a432161cee6a3dab55e3e4e
Parents: f7e728b
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 17:15:32 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 17:15:32 2017 -0500
----------------------------------------------------------------------
.../async/migrations/TestMigrationManager.java | 50 ++++++++++++++++++--
1 file changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/75118e43/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 3522911..80a03bb 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -33,6 +33,7 @@ import org.apache.curator.x.async.modeled.JacksonModelSerializer;
import org.apache.curator.x.async.modeled.ModelSpec;
import org.apache.curator.x.async.modeled.ModeledFramework;
import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -94,7 +95,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
}
@Test
- public void testBasic() throws Exception
+ public void testBasic()
{
Migration m1 = () -> Arrays.asList(v1opA, v1opB);
Migration m2 = () -> Collections.singletonList(v2op);
@@ -116,7 +117,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
}
@Test
- public void testStaged() throws Exception
+ public void testStaged()
{
Migration m1 = () -> Arrays.asList(v1opA, v1opB);
MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(m1));
@@ -174,7 +175,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
}
@Test
- public void testChecksumDataError() throws Exception
+ public void testChecksumDataError()
{
CuratorOp op1 = client.transactionOp().create().forPath("/test");
CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
@@ -197,7 +198,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
}
@Test
- public void testChecksumPathError() throws Exception
+ public void testChecksumPathError()
{
CuratorOp op1 = client.transactionOp().create().forPath("/test2");
CuratorOp op2 = client.transactionOp().create().forPath("/test2/bar");
@@ -218,4 +219,45 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertTrue(Throwables.getRootCause(e) instanceof MigrationException);
}
}
+
+ @Test
+ public void testPartialApplyForBadOps() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/test", "something".getBytes());
+ CuratorOp op2 = client.transactionOp().create().forPath("/a/b/c");
+ Migration m1 = () -> Collections.singletonList(op1);
+ Migration m2 = () -> Collections.singletonList(op2);
+ MigrationSet migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2));
+ try
+ {
+ complete(manager.migrate(migrationSet));
+ Assert.fail("Should throw");
+ }
+ catch ( Throwable e )
+ {
+ Assert.assertTrue(Throwables.getRootCause(e) instanceof KeeperException.NoNodeException);
+ }
+
+ Assert.assertEquals(client.unwrap().getData().forPath("/test"), "something".getBytes());
+ }
+
+ @Test
+ public void testTransactionForBadOps() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/test2", "something".getBytes());
+ CuratorOp op2 = client.transactionOp().create().forPath("/a/b/c/d");
+ Migration migration = () -> Arrays.asList(op1, op2);
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ try
+ {
+ complete(manager.migrate(migrationSet));
+ Assert.fail("Should throw");
+ }
+ catch ( Throwable e )
+ {
+ Assert.assertTrue(Throwables.getRootCause(e) instanceof KeeperException.NoNodeException);
+ }
+
+ Assert.assertNull(client.unwrap().checkExists().forPath("/test"));
+ }
}
[08/23] curator git commit: added some doc
Posted by ra...@apache.org.
added some doc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4f12abcd
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4f12abcd
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4f12abcd
Branch: refs/heads/master
Commit: 4f12abcdc0c61cae52099f47f158433ebe09326c
Parents: ecbd386
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 10:48:47 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 10:48:47 2017 -0500
----------------------------------------------------------------------
.../x/async/modeled/migrations/MetaData.java | 9 ++++
.../x/async/modeled/migrations/Migration.java | 17 +++++++
.../modeled/migrations/MigrationManager.java | 48 +++++++++++++++++---
.../async/modeled/migrations/MigrationSet.java | 13 ++++++
4 files changed, 81 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/4f12abcd/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
index c2878e2..da40a5b 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
@@ -20,6 +20,9 @@ package org.apache.curator.x.async.modeled.migrations;
import java.util.Objects;
+/**
+ * The meta data of a single migration
+ */
public class MetaData
{
private final String migrationId;
@@ -36,11 +39,17 @@ public class MetaData
this.migrationVersion = migrationVersion;
}
+ /**
+ * @return The ID of the migration that was applied
+ */
public String getMigrationId()
{
return migrationId;
}
+ /**
+ * @return the version of the migration that was applied
+ */
public int getMigrationVersion()
{
return migrationVersion;
http://git-wip-us.apache.org/repos/asf/curator/blob/4f12abcd/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index 63a7a7d..b456580 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -23,14 +23,31 @@ import java.util.List;
import java.util.Objects;
import java.util.function.Supplier;
+/**
+ * Models a single migration/transition
+ */
public interface Migration
{
+ /**
+ * @return the unique ID for this migration
+ */
String id();
+ /**
+ * @return the version of this migration
+ */
int version();
+ /**
+ * @return the operations to execute in a transaction
+ */
List<CuratorOp> operations();
+ static Migration build(String id, Supplier<List<CuratorOp>> operationsProc)
+ {
+ return build(id, 1, operationsProc);
+ }
+
static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
{
Objects.requireNonNull(id, "id cannot be null");
http://git-wip-us.apache.org/repos/asf/curator/blob/4f12abcd/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index bfb9707..01de2f8 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -42,6 +42,9 @@ import java.util.stream.Collectors;
import static org.apache.curator.x.async.AsyncWrappers.*;
+/**
+ * Manages migrations
+ */
public class MigrationManager
{
private final AsyncCuratorFramework client;
@@ -52,6 +55,16 @@ public class MigrationManager
private static final String META_DATA_NODE_NAME = "meta-";
+ /**
+ * Jackson usage: See the note in {@link org.apache.curator.x.async.modeled.JacksonModelSerializer} regarding how the Jackson library is specified in Curator's Maven file.
+ * Unless you are not using Jackson pass <code>JacksonModelSerializer.build(MetaData.class)</code> for <code>metaDataSerializer</code>
+ *
+ * @param client the curator client
+ * @param lockPath base path for locks used by the manager
+ * @param metaDataSerializer JacksonModelSerializer.build(MetaData.class)
+ * @param executor the executor to use
+ * @param lockMax max time to wait for locks
+ */
public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
@@ -61,12 +74,13 @@ public class MigrationManager
this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
}
- public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
- {
- ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
- return ZNode.models(modeled.childrenAsZNodes());
- }
-
+ /**
+ * Process the given migration set
+ *
+ * @param set the set
+ * @return completion stage. If there is a migration-specific error, the stage will be completed
+ * exceptionally with {@link org.apache.curator.x.async.modeled.migrations.MigrationException}.
+ */
public CompletionStage<Void> migrate(MigrationSet set)
{
String lockPath = this.lockPath.child(set.id()).fullPath();
@@ -75,6 +89,28 @@ public class MigrationManager
return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
}
+ /**
+ * Utility to return the meta data from previous migrations
+ *
+ * @param set the set
+ * @return stage
+ */
+ public CompletionStage<List<MetaData>> metaData(MigrationSet set)
+ {
+ ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
+ return ZNode.models(modeled.childrenAsZNodes());
+ }
+
+ /**
+ * Can be overridden to change how the comparison to previous migrations is done. The default
+ * version ensures that the meta data from previous migrations matches the current migration
+ * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
+ *
+ * @param set the migration set being applied
+ * @param sortedMetaData previous migration meta data (may be empty)
+ * @return the list of actual migrations to perform. The filter can return any value here or an empty list.
+ * @throws MigrationException errors
+ */
protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
{
if ( sortedMetaData.size() > set.migrations().size() )
http://git-wip-us.apache.org/repos/asf/curator/blob/4f12abcd/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
index 0a0dbe0..c4cd90e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
@@ -23,12 +23,25 @@ import org.apache.curator.x.async.modeled.ZPath;
import java.util.List;
import java.util.Objects;
+/**
+ * Models a set of migrations. Each individual migration is applied
+ * in a transaction.
+ */
public interface MigrationSet
{
+ /**
+ * @return the unique ID for this migration set
+ */
String id();
+ /**
+ * @return where to store the meta data for this migration set
+ */
ZPath metaDataPath();
+ /**
+ * @return list of migrations in the order that they should be applied
+ */
List<Migration> migrations();
static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)
[04/23] curator git commit: reworking so that this feature is more
general. Now manages any set of transactions
Posted by ra...@apache.org.
reworking so that this feature is more general. Now manages any set of transactions
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a15582b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a15582b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a15582b
Branch: refs/heads/master
Commit: 1a15582b59ccfaf80b1547fd57bcb7dfe894b1c8
Parents: 1a0c162
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 09:24:33 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 09:24:33 2017 -0500
----------------------------------------------------------------------
.../InvalidMigrationSetException.java | 37 ----
.../x/async/modeled/migrations/Migration.java | 14 +-
.../modeled/migrations/MigrationException.java | 37 ++++
.../modeled/migrations/MigrationManager.java | 132 +++++++++++++-
.../migrations/MigrationManagerBuilder.java | 68 -------
.../migrations/MigrationManagerImpl.java | 181 -------------------
.../async/modeled/migrations/MigrationSet.java | 11 +-
.../migrations/TestMigrationManager.java | 35 ++--
8 files changed, 196 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
deleted file mode 100644
index 84b21bf..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import java.util.Objects;
-
-public class InvalidMigrationSetException extends RuntimeException
-{
- private final String migrationId;
-
- public InvalidMigrationSetException(String migrationId, String message)
- {
- super(message);
- this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
- }
-
- public String getMigrationId()
- {
- return migrationId;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index b3919d1..63a7a7d 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -18,8 +18,10 @@
*/
package org.apache.curator.x.async.modeled.migrations;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import java.util.List;
import java.util.Objects;
-import java.util.function.UnaryOperator;
+import java.util.function.Supplier;
public interface Migration
{
@@ -27,12 +29,12 @@ public interface Migration
int version();
- byte[] migrate(byte[] previousBytes);
+ List<CuratorOp> operations();
- static Migration build(String id, int version, UnaryOperator<byte[]> migrateProc)
+ static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
{
Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(migrateProc, "migrateProc cannot be null");
+ Objects.requireNonNull(operationsProc, "operationsProc cannot be null");
return new Migration()
{
@Override
@@ -48,9 +50,9 @@ public interface Migration
}
@Override
- public byte[] migrate(byte[] previousBytes)
+ public List<CuratorOp> operations()
{
- return migrateProc.apply(previousBytes);
+ return operationsProc.get();
}
};
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
new file mode 100644
index 0000000..1a1d59c
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+
+public class MigrationException extends RuntimeException
+{
+ private final String migrationId;
+
+ public MigrationException(String migrationId, String message)
+ {
+ super(message);
+ this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+ }
+
+ public String getMigrationId()
+ {
+ return migrationId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 2d5f39f..47adb1e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -18,20 +18,142 @@
*/
package org.apache.curator.x.async.modeled.migrations;
+import com.google.common.base.Throwables;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.CreateMode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
-public interface MigrationManager
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+public class MigrationManager
{
- CompletionStage<List<MetaData>> metaData(ZPath metaDataPath);
+ private final AsyncCuratorFramework client;
+ private final ZPath lockPath;
+ private final ModelSerializer<MetaData> metaDataSerializer;
+ private final Executor executor;
+ private final Duration lockMax;
+
+ private static final String META_DATA_NODE_NAME = "meta-";
+
+ public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
+ {
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+ this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
+ this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+ this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+ }
+
+ public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
+ {
+ ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
+ return ZNode.models(modeled.childrenAsZNodes());
+ }
+
+ public CompletionStage<Void> migrate(MigrationSet set)
+ {
+ String lockPath = this.lockPath.child(set.id()).fullPath();
+ InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
+ CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+ return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+ }
+
+ protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
+ {
+ if ( sortedMetaData.size() > set.migrations().size() )
+ {
+ throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+ }
- CompletionStage<Void> run();
+ int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
+ List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
+ .stream()
+ .map(m -> new MetaData(m.id(), m.version()))
+ .collect(Collectors.toList());
+ if ( !compareMigrations.equals(sortedMetaData) )
+ {
+ throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+ }
+ return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
+ }
+
+ private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+ {
+ ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
+ return modeled.childrenAsZNodes()
+ .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
+ .handle((v, e) -> {
+ release(lock, true);
+ if ( e != null )
+ {
+ Throwables.propagate(e);
+ }
+ return v;
+ }
+ );
+ }
+
+ private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
+ {
+ ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+ return ModeledFramework.wrap(client, modelSpec);
+ }
+
+ private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
+ {
+ List<MetaData> sortedMetaData = metaDataNodes
+ .stream()
+ .sorted(Comparator.comparing(m -> m.path().fullPath()))
+ .map(ZNode::model)
+ .collect(Collectors.toList());
+
+ List<Migration> toBeApplied;
+ try
+ {
+ toBeApplied = filter(set, sortedMetaData);
+ }
+ catch ( MigrationException e )
+ {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ if ( toBeApplied.size() == 0 )
+ {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
+ .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient));
+ }
- static MigrationManagerBuilder builder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
+ private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
{
- return new MigrationManagerBuilder(client, lockPath, metaDataSerializer);
+ List<CuratorOp> operations = new ArrayList<>();
+ for ( Migration migration : toBeApplied )
+ {
+ operations.addAll(migration.operations());
+ MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+ operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
+ }
+ return client.transaction().forOperations(operations).thenApply(__ -> null);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
deleted file mode 100644
index ed48242..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ZPath;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Executor;
-
-public class MigrationManagerBuilder
-{
- private final AsyncCuratorFramework client;
- private final ZPath lockPath;
- private final ModelSerializer<MetaData> metaDataSerializer;
- private final List<MigrationSet> sets = new ArrayList<>();
- private Executor executor = Runnable::run;
- private Duration lockMax = Duration.ofSeconds(15);
-
- MigrationManagerBuilder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
- {
- this.client = Objects.requireNonNull(client, "client cannot be null");
- this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
- this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
- }
-
- public MigrationManager build()
- {
- return new MigrationManagerImpl(client, lockPath, metaDataSerializer, executor, lockMax, sets);
- }
-
- public MigrationManagerBuilder withExecutor(Executor executor)
- {
- this.executor = Objects.requireNonNull(executor, "executor cannot be null");
- return this;
- }
-
- public MigrationManagerBuilder withLockMax(Duration lockMax)
- {
- this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
- return this;
- }
-
- public MigrationManagerBuilder adding(MigrationSet set)
- {
- sets.add(Objects.requireNonNull(set, "set cannot be null"));
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
deleted file mode 100644
index 15c61b2..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZNode;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.CreateMode;
-import java.time.Duration;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.curator.x.async.AsyncWrappers.*;
-
-class MigrationManagerImpl implements MigrationManager
-{
- private final AsyncCuratorFramework client;
- private final ZPath lockPath;
- private final ModelSerializer<MetaData> metaDataSerializer;
- private final Executor executor;
- private final Duration lockMax;
- private final List<MigrationSet> sets;
-
- private static final String META_DATA_NODE_NAME = "meta-";
-
- MigrationManagerImpl(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax, List<MigrationSet> sets)
- {
- this.client = client;
- this.lockPath = lockPath;
- this.metaDataSerializer = metaDataSerializer;
- this.executor = executor;
- this.lockMax = lockMax;
- this.sets = ImmutableList.copyOf(sets);
- }
-
- @Override
- public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
- {
- ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
- return ZNode.models(modeled.childrenAsZNodes());
- }
-
- @Override
- public CompletionStage<Void> run()
- {
- Map<String, CompletableFuture<Void>> futures = sets
- .stream()
- .map(m -> new AbstractMap.SimpleEntry<>(m.id(), runMigration(m).toCompletableFuture()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[futures.size()]));
- }
-
- private CompletionStage<Void> runMigration(MigrationSet set)
- {
- String lockPath = this.lockPath.child(set.id()).fullPath();
- InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
- CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
- return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
- }
-
- private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
- {
- ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
- return modeled.childrenAsZNodes()
- .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
- .handle((v, e) -> {
- release(lock, true);
- if ( e != null )
- {
- Throwables.propagate(e);
- }
- return v;
- }
- );
- }
-
- private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
- {
- ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
- return ModeledFramework.wrap(client, modelSpec);
- }
-
- protected void checkIsValid(MigrationSet set, List<MetaData> sortedMetaData) throws InvalidMigrationSetException
- {
- if ( sortedMetaData.size() > set.migrations().size() )
- {
- throw new InvalidMigrationSetException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
- }
-
- int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
- List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
- .stream()
- .map(m -> new MetaData(m.id(), m.version()))
- .collect(Collectors.toList());
- if ( !compareMigrations.equals(sortedMetaData) )
- {
- throw new InvalidMigrationSetException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
- }
- }
-
- private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
- {
- List<MetaData> sortedMetaData = metaDataNodes
- .stream()
- .sorted(Comparator.comparing(m -> m.path().fullPath()))
- .map(ZNode::model)
- .collect(Collectors.toList());
- try
- {
- checkIsValid(set, sortedMetaData);
- }
- catch ( InvalidMigrationSetException e )
- {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(e);
- return future;
- }
-
- List<Migration> toBeApplied = set.migrations().subList(sortedMetaData.size(), set.migrations().size());
- if ( toBeApplied.size() == 0 )
- {
- return CompletableFuture.completedFuture(null);
- }
-
- return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
- .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, metaDataClient));
- }
-
- private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
- {
- ModelSpec<byte[]> modelSpec = ModelSpec.builder(set.path(), ModelSerializer.raw).build();
- ModeledFramework<byte[]> modeled = ModeledFramework.wrap(client, modelSpec);
- return modeled.childrenAsZNodes().thenCompose(nodes -> {
- List<CuratorOp> operations = new ArrayList<>();
- for ( ZNode<byte[]> node : nodes )
- {
- byte[] currentBytes = node.model();
- for ( Migration migration : toBeApplied )
- {
- currentBytes = migration.migrate(currentBytes);
- MetaData thisMetaData = new MetaData(migration.id(), migration.version());
- operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
- }
- operations.add(modeled.child(node.path().nodeName()).updateOp(currentBytes, node.stat().getVersion()));
- }
- return client.transaction().forOperations(operations).thenApply(__ -> null);
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
index 9e41989..0a0dbe0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
@@ -27,16 +27,13 @@ public interface MigrationSet
{
String id();
- ZPath path();
-
ZPath metaDataPath();
List<Migration> migrations();
- static MigrationSet build(String id, ZPath path, ZPath metaDataPath, List<Migration> migrations)
+ static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)
{
Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(path, "path cannot be null");
Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
return new MigrationSet()
@@ -48,12 +45,6 @@ public interface MigrationSet
}
@Override
- public ZPath path()
- {
- return path;
- }
-
- @Override
public ZPath metaDataPath()
{
return metaDataPath;
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index d709abe..daf69cd 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.migrations;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -37,7 +38,11 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.UnaryOperator;
public class TestMigrationManager extends CompletableBaseClassForTests
@@ -47,6 +52,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
private ModelSpec<ModelV1> v1Spec;
private ModelSpec<ModelV2> v2Spec;
private ModelSpec<ModelV3> v3Spec;
+ private ExecutorService executor;
@BeforeMethod
@Override
@@ -54,10 +60,10 @@ public class TestMigrationManager extends CompletableBaseClassForTests
{
super.setup();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
- client.start();
+ CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+ rawClient.start();
- this.client = AsyncCuratorFramework.wrap(client);
+ this.client = AsyncCuratorFramework.wrap(rawClient);
ObjectMapper mapper = new ObjectMapper();
UnaryOperator<byte[]> from1to2 = bytes -> {
@@ -89,13 +95,20 @@ public class TestMigrationManager extends CompletableBaseClassForTests
ZPath modelPath = ZPath.parse("/test/it");
- Migration m1 = Migration.build("1",1, from1to2);
- Migration m2 = Migration.build("2",1, from2to3);
- migrationSet = MigrationSet.build("1", modelPath, ZPath.parse("/metadata"), Arrays.asList(m1, m2));
-
v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+
+ CuratorOp v1op = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+ CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+ CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
+
+ Migration m1 = Migration.build("1",1, () -> Collections.singletonList(v1op));
+ Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+ Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+ migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
+
+ executor = Executors.newCachedThreadPool();
}
@AfterMethod
@@ -103,6 +116,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
public void teardown() throws Exception
{
CloseableUtils.closeQuietly(client.unwrap());
+ executor.shutdownNow();
super.teardown();
}
@@ -113,11 +127,8 @@ public class TestMigrationManager extends CompletableBaseClassForTests
ModelV1 v1 = new ModelV1("John Galt");
complete(v1Client.child("1").set(v1));
- MigrationManager manager = MigrationManager.builder(this.client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class))
- .adding(migrationSet)
- .build();
-
- complete(manager.run());
+ MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+ complete(manager.migrate(migrationSet));
ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
complete(v3Client.child("1").read(), (m, e) -> {
[14/23] curator git commit: give Migrations a link in the left menu
of help
Posted by ra...@apache.org.
give Migrations a link in the left menu of help
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/909ed9ae
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/909ed9ae
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/909ed9ae
Branch: refs/heads/master
Commit: 909ed9aeddc034a9a59868db22847801ea25b4ec
Parents: 72ff5a9
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 14:37:09 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 14:37:09 2017 -0500
----------------------------------------------------------------------
curator-x-async/src/site/site.xml | 8 +++++---
src/site/site.xml | 1 +
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/909ed9ae/curator-x-async/src/site/site.xml
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/site.xml b/curator-x-async/src/site/site.xml
index fc0a67a..70b5d4b 100644
--- a/curator-x-async/src/site/site.xml
+++ b/curator-x-async/src/site/site.xml
@@ -23,15 +23,17 @@
<body>
<head>
<link rel="stylesheet" href="../css/site.css" />
- <script type="text/javascript">
+ <script type="text/javascript"><![CDATA[
$(function(){
- if ( location && location.pathname && (location.pathname.endsWith('/index.html') || location.pathname.endsWith('/migrations.html')) ) {
+ if ( location && location.pathname && location.pathname.endsWith('/index.html') ) {
$('a[title="Java 8/Async"]').parent().addClass("active");
+ } else if ( location && location.pathname && location.pathname.endsWith('/migrations.html') ) {
+ $('a[title="Migrations"]').parent().addClass("active");
} else {
$('a[title="Strongly Typed Models"]').parent().addClass("active");
}
});
- </script>
+ ]]></script>
</head>
</body>
</project>
http://git-wip-us.apache.org/repos/asf/curator/blob/909ed9ae/src/site/site.xml
----------------------------------------------------------------------
diff --git a/src/site/site.xml b/src/site/site.xml
index 83ddd46..4b1ac98 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -75,6 +75,7 @@
<item name="Client" href="curator-client/index.html"/>
<item name="Java 8/Async" href="curator-x-async/index.html"/>
<item name="Strongly Typed Models" href="curator-x-async/modeled.html"/>
+ <item name="Migrations" href="curator-x-async/migrations.html"/>
<item name="Schema Support" href="curator-framework/schema.html"/>
</menu>
[18/23] curator git commit: Added asyncEnsureParents()
Posted by ra...@apache.org.
Added asyncEnsureParents()
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9d30a8c8
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9d30a8c8
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9d30a8c8
Branch: refs/heads/master
Commit: 9d30a8c8769f803715862eb9b5479057bfd0d5af
Parents: 9385d04
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:02:30 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:02:30 2017 -0500
----------------------------------------------------------------------
.../apache/curator/x/async/AsyncWrappers.java | 36 ++++++++++++++------
1 file changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9d30a8c8/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index d7b3cc3..f26b3b4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -28,7 +28,6 @@ import org.apache.zookeeper.KeeperException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
@@ -134,8 +133,19 @@ public class AsyncWrappers
}
/**
- * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
- * the given executor
+ * Asynchronously ensure that the parents of the given path are created
+ *
+ * @param client client
+ * @param path path to ensure
+ * @return stage
+ */
+ public static CompletionStage<Void> asyncEnsureParents(AsyncCuratorFramework client, String path)
+ {
+ return ensure(client, path, ExistsOption.createParentsIfNeeded);
+ }
+
+ /**
+ * Asynchronously ensure that the parents of the given path are created as containers
*
* @param client client
* @param path path to ensure
@@ -143,14 +153,7 @@ public class AsyncWrappers
*/
public static CompletionStage<Void> asyncEnsureContainers(AsyncCuratorFramework client, String path)
{
- String localPath = ZKPaths.makePath(path, "foo");
- Set<ExistsOption> options = Collections.singleton(ExistsOption.createParentsAsContainers);
- return client
- .checkExists()
- .withOptions(options)
- .forPath(localPath)
- .thenApply(__ -> null)
- ;
+ return ensure(client, path, ExistsOption.createParentsAsContainers);
}
/**
@@ -373,6 +376,17 @@ public class AsyncWrappers
});
}
+ private static CompletionStage<Void> ensure(AsyncCuratorFramework client, String path, ExistsOption option)
+ {
+ String localPath = ZKPaths.makePath(path, "foo");
+ return client
+ .checkExists()
+ .withOptions(Collections.singleton(option))
+ .forPath(localPath)
+ .thenApply(__ -> null)
+ ;
+ }
+
private AsyncWrappers()
{
}
[21/23] curator git commit: The entire migration set should be 1
transaction - not each inidividual migration
Posted by ra...@apache.org.
The entire migration set should be 1 transaction - not each inidividual migration
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/33e41388
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/33e41388
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/33e41388
Branch: refs/heads/master
Commit: 33e4138840904635a1793084051fd50b643794f1
Parents: c77ef82
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 18 08:10:43 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 18 08:10:43 2017 -0500
----------------------------------------------------------------------
.../x/async/migrations/MigrationManager.java | 19 +++++++++----------
.../src/site/confluence/migrations.confluence | 4 +++-
.../x/async/migrations/TestMigrationManager.java | 2 +-
3 files changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
index 56e7f04..e51f0e4 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -106,7 +106,6 @@ public class MigrationManager
}
int compareSize = Math.min(set.migrations().size(), operationHashesInOrder.size());
- List<Migration> subList = set.migrations().subList(0, compareSize);
for ( int i = 0; i < compareSize; ++i )
{
byte[] setHash = hash(set.migrations().get(i).operations());
@@ -184,23 +183,23 @@ public class MigrationManager
}
return asyncEnsureContainers(client, thisMetaDataPath)
- .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath));
+ .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, thisMetaDataPath));
}
@VisibleForTesting
volatile AtomicInteger debugCount = null;
- private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath)
+ private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, String thisMetaDataPath)
{
debugCount.incrementAndGet();
+ List<CuratorOp> operations = new ArrayList<>();
String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME);
- List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
- List<CuratorOp> operations = new ArrayList<>();
- operations.addAll(migration.operations());
- operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(operations)));
- return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
- }).collect(Collectors.toList());
- return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
+ toBeApplied.forEach(migration -> {
+ List<CuratorOp> thisMigrationOperations = migration.operations();
+ operations.addAll(thisMigrationOperations);
+ operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(thisMigrationOperations)));
+ });
+ return client.transaction().forOperations(operations).thenApply(__ -> null);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/site/confluence/migrations.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence
index 4775fac..bd2d36f 100644
--- a/curator-x-async/src/site/confluence/migrations.confluence
+++ b/curator-x-async/src/site/confluence/migrations.confluence
@@ -91,7 +91,9 @@ manager.migrate(set).exceptionally(e -> {
});
{code}
-Each migration in the set is applied in a transaction. MigrationManager stores a hash
+Each migration in the set is applied in a single transaction - i.e. all operations that comprise
+a migration set (the sum of all individual migration operations) are sent to ZooKeeper as a single
+transaction. MigrationManager stores a hash
of all operations in a migration so that it can be compared for future operations. i.e.
if, in the future, a migration set is attempted but the hash of one of the previous migrations
does not match, the stage completes exceptionally with {{MigrationException}}.
http://git-wip-us.apache.org/repos/asf/curator/blob/33e41388/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 786e704..47d09ab 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -268,7 +268,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertTrue(Throwables.getRootCause(e) instanceof KeeperException.NoNodeException);
}
- Assert.assertEquals(client.unwrap().getData().forPath("/test"), "something".getBytes());
+ Assert.assertNull(client.unwrap().checkExists().forPath("/test")); // should be all or nothing
}
@Test
[05/23] curator git commit: new generalization complete with initial
test
Posted by ra...@apache.org.
new generalization complete with initial test
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c3adc953
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c3adc953
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c3adc953
Branch: refs/heads/master
Commit: c3adc95315413ee5c0be861b528e80a380444e5b
Parents: 1a15582
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 09:36:51 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 09:36:51 2017 -0500
----------------------------------------------------------------------
.../async/modeled/migrations/MigrationManager.java | 10 +++++-----
.../modeled/migrations/TestMigrationManager.java | 17 +++++++----------
2 files changed, 12 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c3adc953/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 47adb1e..e59b7bf 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -147,13 +147,13 @@ public class MigrationManager
private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
{
- List<CuratorOp> operations = new ArrayList<>();
- for ( Migration migration : toBeApplied )
- {
+ List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
+ List<CuratorOp> operations = new ArrayList<>();
operations.addAll(migration.operations());
MetaData thisMetaData = new MetaData(migration.id(), migration.version());
operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
- }
- return client.transaction().forOperations(operations).thenApply(__ -> null);
+ return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
+ }).collect(Collectors.toList());
+ return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c3adc953/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index daf69cd..9fcd53c 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -99,11 +99,12 @@ public class TestMigrationManager extends CompletableBaseClassForTests
v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
- CuratorOp v1op = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+ CuratorOp v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
+ CuratorOp v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
- Migration m1 = Migration.build("1",1, () -> Collections.singletonList(v1op));
+ Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
@@ -123,18 +124,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests
@Test
public void testBasic() throws Exception
{
- ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
- ModelV1 v1 = new ModelV1("John Galt");
- complete(v1Client.child("1").set(v1));
-
MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
- complete(v3Client.child("1").read(), (m, e) -> {
- Assert.assertEquals(m.getAge(), 64);
- Assert.assertEquals(m.getFirstName(), "John");
- Assert.assertEquals(m.getLastName(), "Galt");
+ complete(v3Client.read(), (m, e) -> {
+ Assert.assertEquals(m.getAge(), 30);
+ Assert.assertEquals(m.getFirstName(), "One");
+ Assert.assertEquals(m.getLastName(), "Two");
});
}
}
[11/23] curator git commit: major refactoring and simplification. No
longer dependent on any modeled code so it's moved to the parent async
package
Posted by ra...@apache.org.
major refactoring and simplification. No longer dependent on any modeled code so it's moved to the parent async package
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d80651a7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d80651a7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d80651a7
Branch: refs/heads/master
Commit: d80651a7a276b0ce999601c30e205aceaae94d4c
Parents: 2ab172a
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:32:37 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:32:37 2017 -0500
----------------------------------------------------------------------
.../apache/curator/x/async/AsyncWrappers.java | 96 +++++++-
.../curator/x/async/migrations/Migration.java | 37 ++++
.../x/async/migrations/MigrationException.java | 37 ++++
.../x/async/migrations/MigrationManager.java | 195 ++++++++++++++++
.../x/async/migrations/MigrationSet.java | 72 ++++++
.../x/async/modeled/migrations/MetaData.java | 25 ---
.../x/async/modeled/migrations/Migration.java | 37 ----
.../modeled/migrations/MigrationException.java | 37 ----
.../modeled/migrations/MigrationManager.java | 220 -------------------
.../async/modeled/migrations/MigrationSet.java | 73 ------
.../async/migrations/TestMigrationManager.java | 173 +++++++++++++++
.../x/async/migrations/models/ModelV1.java | 39 ++++
.../x/async/migrations/models/ModelV2.java | 46 ++++
.../x/async/migrations/models/ModelV3.java | 53 +++++
.../migrations/TestMigrationManager.java | 173 ---------------
.../modeled/migrations/models/ModelV1.java | 39 ----
.../modeled/migrations/models/ModelV2.java | 46 ----
.../modeled/migrations/models/ModelV3.java | 53 -----
18 files changed, 747 insertions(+), 704 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..d7b3cc3 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,12 +18,16 @@
*/
package org.apache.curator.x.async;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
import java.util.Collections;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -70,6 +74,66 @@ import java.util.concurrent.TimeUnit;
public class AsyncWrappers
{
/**
+ * <p>
+ * Return the children of the given path (keyed by the full path) and the data for each node.
+ * IMPORTANT: this results in a ZooKeeper query
+ * for each child node returned. i.e. if the initial children() call returns
+ * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+ * </p>
+ *
+ * <p>
+ * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+ * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+ * </p>
+ *
+ * @return CompletionStage
+ */
+ public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path)
+ {
+ return childrenWithData(client, path, false);
+ }
+
+ /**
+ * <p>
+ * Return the children of the given path (keyed by the full path) and the data for each node.
+ * IMPORTANT: this results in a ZooKeeper query
+ * for each child node returned. i.e. if the initial children() call returns
+ * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+ * </p>
+ *
+ * <p>
+ * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException}
+ * is <strong>NOT</strong> set. Instead the stage is completed with an empty map.
+ * </p>
+ *
+ * @param isCompressed pass true if data is compressed
+ * @return CompletionStage
+ */
+ public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed)
+ {
+ CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>();
+ client.getChildren().forPath(path).handle((children, e) -> {
+ if ( e != null )
+ {
+ if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException )
+ {
+ future.complete(Maps.newHashMap());
+ }
+ else
+ {
+ future.completeExceptionally(e);
+ }
+ }
+ else
+ {
+ completeChildren(client, future, path, children, isCompressed);
+ }
+ return null;
+ });
+ return future;
+ }
+
+ /**
* Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using
* the given executor
*
@@ -279,6 +343,36 @@ public class AsyncWrappers
}
}
+ private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed)
+ {
+ Map<String, byte[]> nodes = Maps.newHashMap();
+ if ( children.size() == 0 )
+ {
+ future.complete(nodes);
+ return;
+ }
+
+ children.forEach(node -> {
+ String path = ZKPaths.makePath(parentPath, node);
+ AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+ stage.handle((data, e) -> {
+ if ( e != null )
+ {
+ future.completeExceptionally(e);
+ }
+ else
+ {
+ nodes.put(path, data);
+ if ( nodes.size() == children.size() )
+ {
+ future.complete(nodes);
+ }
+ }
+ return null;
+ });
+ });
+ }
+
private AsyncWrappers()
{
}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
new file mode 100644
index 0000000..daac435
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import java.util.List;
+
+/**
+ * Models a single migration/transition
+ */
+@FunctionalInterface
+public interface Migration
+{
+ /**
+ * Return the operations to execute in a transaction. IMPORTANT: during a migration
+ * this method may be called multiple times.
+ *
+ * @return operations
+ */
+ List<CuratorOp> operations();
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
new file mode 100644
index 0000000..c4971ce
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import java.util.Objects;
+
+public class MigrationException extends RuntimeException
+{
+ private final String migrationId;
+
+ public MigrationException(String migrationId, String message)
+ {
+ super(message);
+ this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+ }
+
+ public String getMigrationId()
+ {
+ return migrationId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
new file mode 100644
index 0000000..cb3d6ff
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.google.common.base.Throwables;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.imps.ExtractingCuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+/**
+ * Manages migrations
+ */
+public class MigrationManager
+{
+ private final AsyncCuratorFramework client;
+ private final String lockPath;
+ private final Executor executor;
+ private final Duration lockMax;
+
+ private static final String META_DATA_NODE_NAME = "meta-";
+
+ /**
+ * @param client the curator client
+ * @param lockPath base path for locks used by the manager
+ * @param executor the executor to use
+ * @param lockMax max time to wait for locks
+ */
+ public MigrationManager(AsyncCuratorFramework client, String lockPath, Executor executor, Duration lockMax)
+ {
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+ this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+ this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+ }
+
+ /**
+ * Process the given migration set
+ *
+ * @param set the set
+ * @return completion stage. If there is a migration-specific error, the stage will be completed
+ * exceptionally with {@link org.apache.curator.x.async.migrations.MigrationException}.
+ */
+ public CompletionStage<Void> migrate(MigrationSet set)
+ {
+ InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), ZKPaths.makePath(lockPath, set.id()));
+ CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+ return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+ }
+
+ /**
+ * Can be overridden to change how the comparison to previous migrations is done. The default
+ * version ensures that the meta data from previous migrations matches the current migration
+ * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
+ *
+ * @param set the migration set being applied
+ * @param operationHashesInOrder previous operation hashes (may be empty)
+ * @return the list of actual migrations to perform. The filter can return any value here or an empty list.
+ * @throws MigrationException errors
+ */
+ protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException
+ {
+ if ( operationHashesInOrder.size() > set.migrations().size() )
+ {
+ throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
+ }
+
+ int compareSize = Math.min(set.migrations().size(), operationHashesInOrder.size());
+ List<Migration> subList = set.migrations().subList(0, compareSize);
+ for ( int i = 0; i < compareSize; ++i )
+ {
+ byte[] setHash = hash(set.migrations().get(i).operations());
+ if ( !Arrays.equals(setHash, operationHashesInOrder.get(i)) )
+ {
+ throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
+ }
+ }
+ return set.migrations().subList(operationHashesInOrder.size(), set.migrations().size());
+ }
+
+ private byte[] hash(List<CuratorOp> operations)
+ {
+ MessageDigest digest;
+ try
+ {
+ digest = MessageDigest.getInstance("SHA-256");
+ }
+ catch ( NoSuchAlgorithmException e )
+ {
+ throw new RuntimeException(e);
+ }
+ operations.forEach(op -> {
+ if ( op instanceof ExtractingCuratorOp )
+ {
+ ((ExtractingCuratorOp)op).addToDigest(digest);
+ }
+ else
+ {
+ digest.update(op.toString().getBytes());
+ }
+ });
+ return digest.digest();
+ }
+
+ private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+ {
+ return childrenWithData(client, set.metaDataPath())
+ .thenCompose(metaData -> applyMetaData(set, metaData))
+ .handle((v, e) -> {
+ release(lock, true);
+ if ( e != null )
+ {
+ Throwables.propagate(e);
+ }
+ return v;
+ }
+ );
+ }
+
+ private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData)
+ {
+ List<byte[]> sortedMetaData = metaData.keySet()
+ .stream()
+ .sorted(Comparator.naturalOrder())
+ .map(metaData::get)
+ .collect(Collectors.toList());
+
+ List<Migration> toBeApplied;
+ try
+ {
+ toBeApplied = filter(set, sortedMetaData);
+ }
+ catch ( MigrationException e )
+ {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ if ( toBeApplied.size() == 0 )
+ {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return asyncEnsureContainers(client, set.metaDataPath())
+ .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied));
+ }
+
+ private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied)
+ {
+ String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), META_DATA_NODE_NAME);
+ List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
+ List<CuratorOp> operations = new ArrayList<>();
+ operations.addAll(migration.operations());
+ operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(operations)));
+ return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
+ }).collect(Collectors.toList());
+ return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
new file mode 100644
index 0000000..089d3d8
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Models a set of migrations. Each individual migration is applied
+ * in a transaction.
+ */
+public interface MigrationSet
+{
+ /**
+ * @return the unique ID for this migration set
+ */
+ String id();
+
+ /**
+ * @return where to store the meta data for this migration set
+ */
+ String metaDataPath();
+
+ /**
+ * @return list of migrations in the order that they should be applied
+ */
+ List<Migration> migrations();
+
+ static MigrationSet build(String id, String metaDataPath, List<Migration> migrations)
+ {
+ Objects.requireNonNull(id, "id cannot be null");
+ Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
+ final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
+ return new MigrationSet()
+ {
+ @Override
+ public String id()
+ {
+ return id;
+ }
+
+ @Override
+ public String metaDataPath()
+ {
+ return metaDataPath;
+ }
+
+ @Override
+ public List<Migration> migrations()
+ {
+ return migrationsCopy;
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
deleted file mode 100644
index 8377967..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-@FunctionalInterface
-public interface MetaData
-{
- byte[] operationHash();
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
deleted file mode 100644
index 972c59e..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import java.util.List;
-
-/**
- * Models a single migration/transition
- */
-@FunctionalInterface
-public interface Migration
-{
- /**
- * Return the operations to execute in a transaction. IMPORTANT: during a migration
- * this method may be called multiple times.
- *
- * @return operations
- */
- List<CuratorOp> operations();
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
deleted file mode 100644
index 1a1d59c..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import java.util.Objects;
-
-public class MigrationException extends RuntimeException
-{
- private final String migrationId;
-
- public MigrationException(String migrationId, String message)
- {
- super(message);
- this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
- }
-
- public String getMigrationId()
- {
- return migrationId;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
deleted file mode 100644
index d6d37de..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.base.Throwables;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.imps.ExtractingCuratorOp;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZNode;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.CreateMode;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.curator.x.async.AsyncWrappers.*;
-
-/**
- * Manages migrations
- */
-public class MigrationManager
-{
- private final AsyncCuratorFramework client;
- private final ZPath lockPath;
- private final Executor executor;
- private final Duration lockMax;
-
- private static final String META_DATA_NODE_NAME = "meta-";
-
- /**
- * @param client the curator client
- * @param lockPath base path for locks used by the manager
- * @param executor the executor to use
- * @param lockMax max time to wait for locks
- */
- public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, Executor executor, Duration lockMax)
- {
- this.client = Objects.requireNonNull(client, "client cannot be null");
- this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
- this.executor = Objects.requireNonNull(executor, "executor cannot be null");
- this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
- }
-
- /**
- * Process the given migration set
- *
- * @param set the set
- * @return completion stage. If there is a migration-specific error, the stage will be completed
- * exceptionally with {@link org.apache.curator.x.async.modeled.migrations.MigrationException}.
- */
- public CompletionStage<Void> migrate(MigrationSet set)
- {
- String lockPath = this.lockPath.child(set.id()).fullPath();
- InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
- CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
- return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
- }
-
- /**
- * Can be overridden to change how the comparison to previous migrations is done. The default
- * version ensures that the meta data from previous migrations matches the current migration
- * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
- *
- * @param set the migration set being applied
- * @param sortedMetaData previous migration meta data (may be empty)
- * @return the list of actual migrations to perform. The filter can return any value here or an empty list.
- * @throws MigrationException errors
- */
- protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
- {
- if ( sortedMetaData.size() > set.migrations().size() )
- {
- throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
- }
-
- int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
- List<Migration> subList = set.migrations().subList(0, compareSize);
- for ( int i = 0; i < compareSize; ++i )
- {
- byte[] setHash = hash(set.migrations().get(i).operations()).operationHash();
- if ( !Arrays.equals(setHash, sortedMetaData.get(i).operationHash()) )
- {
- throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
- }
- }
- return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
- }
-
- private MetaData hash(List<CuratorOp> operations)
- {
- MessageDigest digest;
- try
- {
- digest = MessageDigest.getInstance("SHA-256");
- }
- catch ( NoSuchAlgorithmException e )
- {
- throw new RuntimeException(e);
- }
- operations.forEach(op -> {
- if ( op instanceof ExtractingCuratorOp )
- {
- ((ExtractingCuratorOp)op).addToDigest(digest);
- }
- else
- {
- digest.update(op.toString().getBytes());
- }
- });
- return digest::digest;
- }
-
- private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
- {
- ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
- return modeled.childrenAsZNodes()
- .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
- .handle((v, e) -> {
- release(lock, true);
- if ( e != null )
- {
- Throwables.propagate(e);
- }
- return v;
- }
- );
- }
-
- private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
- {
- ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>()
- {
- @Override
- public byte[] serialize(MetaData model)
- {
- return model.operationHash();
- }
-
- @Override
- public MetaData deserialize(byte[] bytes)
- {
- return () -> bytes;
- }
- };
- ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
- return ModeledFramework.wrap(client, modelSpec);
- }
-
- private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
- {
- List<MetaData> sortedMetaData = metaDataNodes
- .stream()
- .sorted(Comparator.comparing(m -> m.path().fullPath()))
- .map(ZNode::model)
- .collect(Collectors.toList());
-
- List<Migration> toBeApplied;
- try
- {
- toBeApplied = filter(set, sortedMetaData);
- }
- catch ( MigrationException e )
- {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(e);
- return future;
- }
-
- if ( toBeApplied.size() == 0 )
- {
- return CompletableFuture.completedFuture(null);
- }
-
- return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
- .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient));
- }
-
- private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
- {
- List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
- List<CuratorOp> operations = new ArrayList<>();
- operations.addAll(migration.operations());
- MetaData thisMetaData = hash(operations);
- operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
- return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
- }).collect(Collectors.toList());
- return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()]));
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
deleted file mode 100644
index c4cd90e..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.curator.x.async.modeled.ZPath;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Models a set of migrations. Each individual migration is applied
- * in a transaction.
- */
-public interface MigrationSet
-{
- /**
- * @return the unique ID for this migration set
- */
- String id();
-
- /**
- * @return where to store the meta data for this migration set
- */
- ZPath metaDataPath();
-
- /**
- * @return list of migrations in the order that they should be applied
- */
- List<Migration> migrations();
-
- static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)
- {
- Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
- final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
- return new MigrationSet()
- {
- @Override
- public String id()
- {
- return id;
- }
-
- @Override
- public ZPath metaDataPath()
- {
- return metaDataPath;
- }
-
- @Override
- public List<Migration> migrations()
- {
- return migrationsCopy;
- }
- };
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
new file mode 100644
index 0000000..42bc76d
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.modeled.JacksonModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.migrations.models.ModelV1;
+import org.apache.curator.x.async.migrations.models.ModelV2;
+import org.apache.curator.x.async.migrations.models.ModelV3;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.UnaryOperator;
+
+public class TestMigrationManager extends CompletableBaseClassForTests
+{
+ private AsyncCuratorFramework client;
+ private ModelSpec<ModelV1> v1Spec;
+ private ModelSpec<ModelV2> v2Spec;
+ private ModelSpec<ModelV3> v3Spec;
+ private ExecutorService executor;
+ private CuratorOp v1opA;
+ private CuratorOp v1opB;
+ private CuratorOp v2op;
+ private CuratorOp v3op;
+ private MigrationManager manager;
+
+ @BeforeMethod
+ @Override
+ public void setup() throws Exception
+ {
+ super.setup();
+
+ CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+ rawClient.start();
+
+ this.client = AsyncCuratorFramework.wrap(rawClient);
+
+ ObjectMapper mapper = new ObjectMapper();
+ UnaryOperator<byte[]> from1to2 = bytes -> {
+ try
+ {
+ ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
+ ModelV2 v2 = new ModelV2(v1.getName(), 64);
+ return mapper.writeValueAsBytes(v2);
+ }
+ catch ( IOException e )
+ {
+ throw new RuntimeException(e);
+ }
+ };
+
+ UnaryOperator<byte[]> from2to3 = bytes -> {
+ try
+ {
+ ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
+ String[] nameParts = v2.getName().split("\\s");
+ ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
+ return mapper.writeValueAsBytes(v3);
+ }
+ catch ( IOException e )
+ {
+ throw new RuntimeException(e);
+ }
+ };
+
+ ZPath modelPath = ZPath.parse("/test/it");
+
+ v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
+ v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
+ v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+
+ v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
+ v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+ v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+ v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
+
+ executor = Executors.newCachedThreadPool();
+ manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
+ }
+
+ @AfterMethod
+ @Override
+ public void teardown() throws Exception
+ {
+ CloseableUtils.closeQuietly(client.unwrap());
+ executor.shutdownNow();
+ super.teardown();
+ }
+
+ @Test
+ public void testBasic() throws Exception
+ {
+ Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+ Migration m2 = () -> Collections.singletonList(v2op);
+ Migration m3 = () -> Collections.singletonList(v3op);
+ MigrationSet migrationSet = MigrationSet.build("1", "/metadata", Arrays.asList(m1, m2, m3));
+
+ complete(manager.migrate(migrationSet));
+
+ ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+ complete(v3Client.read(), (m, e) -> {
+ Assert.assertEquals(m.getAge(), 30);
+ Assert.assertEquals(m.getFirstName(), "One");
+ Assert.assertEquals(m.getLastName(), "Two");
+ });
+ }
+
+ @Test
+ public void testStaged() throws Exception
+ {
+ Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+ MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", Collections.singletonList(m1));
+ complete(manager.migrate(migrationSet));
+
+ ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
+ complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
+
+ Migration m2 = () -> Collections.singletonList(v2op);
+ migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2));
+ complete(manager.migrate(migrationSet));
+
+ ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
+ complete(v2Client.read(), (m, e) -> {
+ Assert.assertEquals(m.getName(), "Test 2");
+ Assert.assertEquals(m.getAge(), 10);
+ });
+
+ Migration m3 = () -> Collections.singletonList(v3op);
+ migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2, m3));
+ complete(manager.migrate(migrationSet));
+
+ ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+ complete(v3Client.read(), (m, e) -> {
+ Assert.assertEquals(m.getAge(), 30);
+ Assert.assertEquals(m.getFirstName(), "One");
+ Assert.assertEquals(m.getLastName(), "Two");
+ });
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
new file mode 100644
index 0000000..46fc5ff
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV1
+{
+ private final String name;
+
+ public ModelV1()
+ {
+ this("");
+ }
+
+ public ModelV1(String name)
+ {
+ this.name = name;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
new file mode 100644
index 0000000..31e05bd
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV2
+{
+ private final String name;
+ private final int age;
+
+ public ModelV2()
+ {
+ this("", 0);
+ }
+
+ public ModelV2(String name, int age)
+ {
+ this.name = name;
+ this.age = age;
+ }
+
+ public String getName()
+ {
+ return name;
+ }
+
+ public int getAge()
+ {
+ return age;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
new file mode 100644
index 0000000..519923c
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV3
+{
+ private final String firstName;
+ private final String lastName;
+ private final int age;
+
+ public ModelV3()
+ {
+ this("", "", 0);
+ }
+
+ public ModelV3(String firstName, String lastName, int age)
+ {
+ this.firstName = firstName;
+ this.lastName = lastName;
+ this.age = age;
+ }
+
+ public String getFirstName()
+ {
+ return firstName;
+ }
+
+ public String getLastName()
+ {
+ return lastName;
+ }
+
+ public int getAge()
+ {
+ return age;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
deleted file mode 100644
index 45aa130..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.CompletableBaseClassForTests;
-import org.apache.curator.x.async.modeled.JacksonModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV1;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV2;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV3;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.UnaryOperator;
-
-public class TestMigrationManager extends CompletableBaseClassForTests
-{
- private AsyncCuratorFramework client;
- private ModelSpec<ModelV1> v1Spec;
- private ModelSpec<ModelV2> v2Spec;
- private ModelSpec<ModelV3> v3Spec;
- private ExecutorService executor;
- private CuratorOp v1opA;
- private CuratorOp v1opB;
- private CuratorOp v2op;
- private CuratorOp v3op;
- private MigrationManager manager;
-
- @BeforeMethod
- @Override
- public void setup() throws Exception
- {
- super.setup();
-
- CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
- rawClient.start();
-
- this.client = AsyncCuratorFramework.wrap(rawClient);
-
- ObjectMapper mapper = new ObjectMapper();
- UnaryOperator<byte[]> from1to2 = bytes -> {
- try
- {
- ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
- ModelV2 v2 = new ModelV2(v1.getName(), 64);
- return mapper.writeValueAsBytes(v2);
- }
- catch ( IOException e )
- {
- throw new RuntimeException(e);
- }
- };
-
- UnaryOperator<byte[]> from2to3 = bytes -> {
- try
- {
- ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
- String[] nameParts = v2.getName().split("\\s");
- ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
- return mapper.writeValueAsBytes(v3);
- }
- catch ( IOException e )
- {
- throw new RuntimeException(e);
- }
- };
-
- ZPath modelPath = ZPath.parse("/test/it");
-
- v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
- v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
- v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
-
- v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
- v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
- v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
- v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
-
- executor = Executors.newCachedThreadPool();
- manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
- }
-
- @AfterMethod
- @Override
- public void teardown() throws Exception
- {
- CloseableUtils.closeQuietly(client.unwrap());
- executor.shutdownNow();
- super.teardown();
- }
-
- @Test
- public void testBasic() throws Exception
- {
- Migration m1 = () -> Arrays.asList(v1opA, v1opB);
- Migration m2 = () -> Collections.singletonList(v2op);
- Migration m3 = () -> Collections.singletonList(v3op);
- MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
-
- complete(manager.migrate(migrationSet));
-
- ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
- complete(v3Client.read(), (m, e) -> {
- Assert.assertEquals(m.getAge(), 30);
- Assert.assertEquals(m.getFirstName(), "One");
- Assert.assertEquals(m.getLastName(), "Two");
- });
- }
-
- @Test
- public void testStaged() throws Exception
- {
- Migration m1 = () -> Arrays.asList(v1opA, v1opB);
- MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1));
- complete(manager.migrate(migrationSet));
-
- ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
- complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
-
- Migration m2 = () -> Collections.singletonList(v2op);
- migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2));
- complete(manager.migrate(migrationSet));
-
- ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
- complete(v2Client.read(), (m, e) -> {
- Assert.assertEquals(m.getName(), "Test 2");
- Assert.assertEquals(m.getAge(), 10);
- });
-
- Migration m3 = () -> Collections.singletonList(v3op);
- migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
- complete(manager.migrate(migrationSet));
-
- ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
- complete(v3Client.read(), (m, e) -> {
- Assert.assertEquals(m.getAge(), 30);
- Assert.assertEquals(m.getFirstName(), "One");
- Assert.assertEquals(m.getLastName(), "Two");
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
deleted file mode 100644
index 02b13b7..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV1
-{
- private final String name;
-
- public ModelV1()
- {
- this("");
- }
-
- public ModelV1(String name)
- {
- this.name = name;
- }
-
- public String getName()
- {
- return name;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
deleted file mode 100644
index bd77a2e..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV2
-{
- private final String name;
- private final int age;
-
- public ModelV2()
- {
- this("", 0);
- }
-
- public ModelV2(String name, int age)
- {
- this.name = name;
- this.age = age;
- }
-
- public String getName()
- {
- return name;
- }
-
- public int getAge()
- {
- return age;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
deleted file mode 100644
index d4713b8..0000000
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV3
-{
- private final String firstName;
- private final String lastName;
- private final int age;
-
- public ModelV3()
- {
- this("", "", 0);
- }
-
- public ModelV3(String firstName, String lastName, int age)
- {
- this.firstName = firstName;
- this.lastName = lastName;
- this.age = age;
- }
-
- public String getFirstName()
- {
- return firstName;
- }
-
- public String getLastName()
- {
- return lastName;
- }
-
- public int getAge()
- {
- return age;
- }
-}
[02/23] curator git commit: Merge branch 'CURATOR-423' into
CURATOR-421
Posted by ra...@apache.org.
Merge branch 'CURATOR-423' into CURATOR-421
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1784a7c6
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1784a7c6
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1784a7c6
Branch: refs/heads/master
Commit: 1784a7c68982bae8bed471d92464168f87b0fceb
Parents: 4cd731c 4a0e022
Author: randgalt <ra...@apache.org>
Authored: Thu Jul 13 23:52:58 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Jul 13 23:52:58 2017 -0500
----------------------------------------------------------------------
.../x/async/details/AsyncTransactionOpImpl.java | 4 ++--
.../curator/x/async/TestBasicOperations.java | 22 ++++++++++++++------
2 files changed, 18 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
[22/23] curator git commit: reformat doc a bit
Posted by ra...@apache.org.
reformat doc a bit
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/69ec4d7e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/69ec4d7e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/69ec4d7e
Branch: refs/heads/master
Commit: 69ec4d7e6cbe30452fc096f1d5fb706ba118413c
Parents: 33e4138
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 18 08:15:57 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 18 08:15:57 2017 -0500
----------------------------------------------------------------------
curator-x-async/src/site/confluence/migrations.confluence | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/69ec4d7e/curator-x-async/src/site/confluence/migrations.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence
index bd2d36f..403e64a 100644
--- a/curator-x-async/src/site/confluence/migrations.confluence
+++ b/curator-x-async/src/site/confluence/migrations.confluence
@@ -91,9 +91,9 @@ manager.migrate(set).exceptionally(e -> {
});
{code}
-Each migration in the set is applied in a single transaction - i.e. all operations that comprise
+* Each migration in the set is applied in a single transaction - i.e. all operations that comprise
a migration set (the sum of all individual migration operations) are sent to ZooKeeper as a single
-transaction. MigrationManager stores a hash
-of all operations in a migration so that it can be compared for future operations. i.e.
+transaction.
+* MigrationManager stores a hash of all operations in a migration so that it can be compared for future operations. i.e.
if, in the future, a migration set is attempted but the hash of one of the previous migrations
does not match, the stage completes exceptionally with {{MigrationException}}.
[15/23] curator git commit: more testing, bug fixes
Posted by ra...@apache.org.
more testing, bug fixes
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f7e728b9
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f7e728b9
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f7e728b9
Branch: refs/heads/master
Commit: f7e728b99b8bf8cf1e0b2d68da1c3bf10e75d52a
Parents: 909ed9a
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 17:06:42 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 17:06:42 2017 -0500
----------------------------------------------------------------------
.../framework/imps/ExtractingCuratorOp.java | 2 +-
.../x/async/migrations/MigrationManager.java | 7 +++
.../async/migrations/TestMigrationManager.java | 53 ++++++++++++++++++++
3 files changed, 61 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f7e728b9/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
index 58a1572..5b179e7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
@@ -49,7 +49,7 @@ public class ExtractingCuratorOp implements CuratorOp
public void addToDigest(MessageDigest digest)
{
-
+ record.addToDigest(digest);
}
private void validate()
http://git-wip-us.apache.org/repos/asf/curator/blob/f7e728b9/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
index 676eef6..56e7f04 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.x.async.migrations;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Throwables;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.imps.ExtractingCuratorOp;
@@ -39,6 +40,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static org.apache.curator.x.async.AsyncWrappers.*;
@@ -185,8 +187,13 @@ public class MigrationManager
.thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath));
}
+ @VisibleForTesting
+ volatile AtomicInteger debugCount = null;
+
private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath)
{
+ debugCount.incrementAndGet();
+
String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME);
List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
List<CuratorOp> operations = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/curator/blob/f7e728b9/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 19740d6..3522911 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.x.async.migrations;
+import com.google.common.base.Throwables;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -41,6 +42,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
public class TestMigrationManager extends CompletableBaseClassForTests
{
@@ -79,6 +81,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
executor = Executors.newCachedThreadPool();
manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
+ manager.debugCount = new AtomicInteger();
}
@AfterMethod
@@ -106,6 +109,10 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertEquals(m.getFirstName(), "One");
Assert.assertEquals(m.getLastName(), "Two");
});
+
+ int count = manager.debugCount.get();
+ complete(manager.migrate(migrationSet));
+ Assert.assertEquals(manager.debugCount.get(), count); // second call should do nothing
}
@Test
@@ -165,4 +172,50 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertNull(client.unwrap().checkExists().forPath("/main"));
}
+
+ @Test
+ public void testChecksumDataError() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/test");
+ CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+ Migration migration = () -> Arrays.asList(op1, op2);
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ complete(manager.migrate(migrationSet));
+
+ CuratorOp op2Changed = client.transactionOp().create().forPath("/test/bar", "second".getBytes());
+ migration = () -> Arrays.asList(op1, op2Changed);
+ migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ try
+ {
+ complete(manager.migrate(migrationSet));
+ Assert.fail("Should throw");
+ }
+ catch ( Throwable e )
+ {
+ Assert.assertTrue(Throwables.getRootCause(e) instanceof MigrationException);
+ }
+ }
+
+ @Test
+ public void testChecksumPathError() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/test2");
+ CuratorOp op2 = client.transactionOp().create().forPath("/test2/bar");
+ Migration migration = () -> Arrays.asList(op1, op2);
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ complete(manager.migrate(migrationSet));
+
+ CuratorOp op2Changed = client.transactionOp().create().forPath("/test/bar");
+ migration = () -> Arrays.asList(op1, op2Changed);
+ migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ try
+ {
+ complete(manager.migrate(migrationSet));
+ Assert.fail("Should throw");
+ }
+ catch ( Throwable e )
+ {
+ Assert.assertTrue(Throwables.getRootCause(e) instanceof MigrationException);
+ }
+ }
}
[20/23] curator git commit: break up the help menu a bit
Posted by ra...@apache.org.
break up the help menu a bit
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c77ef823
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c77ef823
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c77ef823
Branch: refs/heads/master
Commit: c77ef823f91c4ef8713614aec181f681e7a7af8c
Parents: 84191f3
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:28:55 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:28:55 2017 -0500
----------------------------------------------------------------------
src/site/site.xml | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c77ef823/src/site/site.xml
----------------------------------------------------------------------
diff --git a/src/site/site.xml b/src/site/site.xml
index 4b1ac98..8136c9a 100644
--- a/src/site/site.xml
+++ b/src/site/site.xml
@@ -70,15 +70,18 @@
<item name="Getting Started" href="getting-started.html"/>
<item name="Examples" href="curator-examples/index.html"/>
<item name="Recipes" href="curator-recipes/index.html"/>
- <item name="Framework" href="curator-framework/index.html"/>
- <item name="Utilities" href="utilities.html"/>
- <item name="Client" href="curator-client/index.html"/>
<item name="Java 8/Async" href="curator-x-async/index.html"/>
<item name="Strongly Typed Models" href="curator-x-async/modeled.html"/>
<item name="Migrations" href="curator-x-async/migrations.html"/>
<item name="Schema Support" href="curator-framework/schema.html"/>
</menu>
+ <menu name="Low Level" inherit="top">
+ <item name="Framework" href="curator-framework/index.html"/>
+ <item name="Utilities" href="utilities.html"/>
+ <item name="Client" href="curator-client/index.html"/>
+ </menu>
+
<menu name="Details" inherit="top">
<item name="Error Handling" href="errors.html"/>
<item name="Logging and Tracing" href="logging.html"/>
[09/23] curator git commit: refactoring and simplification. No need
for ids and versions in Migrations/MetaData. A hash can be auto-generated.
Posted by ra...@apache.org.
refactoring and simplification. No need for ids and versions in Migrations/MetaData. A hash can be auto-generated.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c8df9a41
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c8df9a41
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c8df9a41
Branch: refs/heads/master
Commit: c8df9a414b9a035f45946460ff7e1adff7fd65d4
Parents: 4f12abc
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:00:27 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:00:27 2017 -0500
----------------------------------------------------------------------
.../imps/CuratorMultiTransactionRecord.java | 11 +++
.../framework/imps/ExtractingCuratorOp.java | 8 +-
.../x/async/modeled/migrations/MetaData.java | 74 +-----------------
.../x/async/modeled/migrations/Migration.java | 49 ++----------
.../modeled/migrations/MigrationManager.java | 81 +++++++++++++-------
.../src/site/confluence/index.confluence | 1 +
.../migrations/TestMigrationManager.java | 14 ++--
7 files changed, 87 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 0611df6..3e72609 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TypeAndPath;
import org.apache.zookeeper.MultiTransactionRecord;
import org.apache.zookeeper.Op;
+import java.security.MessageDigest;
import java.util.List;
class CuratorMultiTransactionRecord extends MultiTransactionRecord
@@ -50,4 +51,14 @@ class CuratorMultiTransactionRecord extends MultiTransactionRecord
{
return metadata.size();
}
+
+ void addToDigest(MessageDigest digest)
+ {
+ for ( Op op : this )
+ {
+ digest.update(op.getPath().getBytes());
+ digest.update(Integer.toString(op.getType()).getBytes());
+ digest.update(op.toRequestRecord().toString().getBytes());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
index 7a5db69..58a1572 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
@@ -22,8 +22,9 @@ import com.google.common.base.Preconditions;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.TypeAndPath;
import org.apache.zookeeper.Op;
+import java.security.MessageDigest;
-class ExtractingCuratorOp implements CuratorOp
+public class ExtractingCuratorOp implements CuratorOp
{
private final CuratorMultiTransactionRecord record = new CuratorMultiTransactionRecord();
@@ -46,6 +47,11 @@ class ExtractingCuratorOp implements CuratorOp
return record.iterator().next();
}
+ public void addToDigest(MessageDigest digest)
+ {
+
+ }
+
private void validate()
{
Preconditions.checkArgument(record.size() > 0, "No operation has been added");
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
index da40a5b..8377967 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
@@ -18,76 +18,8 @@
*/
package org.apache.curator.x.async.modeled.migrations;
-import java.util.Objects;
-
-/**
- * The meta data of a single migration
- */
-public class MetaData
+@FunctionalInterface
+public interface MetaData
{
- private final String migrationId;
- private final int migrationVersion;
-
- public MetaData()
- {
- this("", 0);
- }
-
- public MetaData(String migrationId, int migrationVersion)
- {
- this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
- this.migrationVersion = migrationVersion;
- }
-
- /**
- * @return The ID of the migration that was applied
- */
- public String getMigrationId()
- {
- return migrationId;
- }
-
- /**
- * @return the version of the migration that was applied
- */
- public int getMigrationVersion()
- {
- return migrationVersion;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if ( this == o )
- {
- return true;
- }
- if ( o == null || getClass() != o.getClass() )
- {
- return false;
- }
-
- MetaData metaData = (MetaData)o;
-
- //noinspection SimplifiableIfStatement
- if ( migrationVersion != metaData.migrationVersion )
- {
- return false;
- }
- return migrationId.equals(metaData.migrationId);
- }
-
- @Override
- public int hashCode()
- {
- int result = migrationId.hashCode();
- result = 31 * result + migrationVersion;
- return result;
- }
-
- @Override
- public String toString()
- {
- return "MetaData{" + "migrationId='" + migrationId + '\'' + ", migrationVersion=" + migrationVersion + '}';
- }
+ byte[] operationHash();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index b456580..972c59e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -20,57 +20,18 @@ package org.apache.curator.x.async.modeled.migrations;
import org.apache.curator.framework.api.transaction.CuratorOp;
import java.util.List;
-import java.util.Objects;
-import java.util.function.Supplier;
/**
* Models a single migration/transition
*/
+@FunctionalInterface
public interface Migration
{
/**
- * @return the unique ID for this migration
- */
- String id();
-
- /**
- * @return the version of this migration
- */
- int version();
-
- /**
- * @return the operations to execute in a transaction
+ * Return the operations to execute in a transaction. IMPORTANT: during a migration
+ * this method may be called multiple times.
+ *
+ * @return operations
*/
List<CuratorOp> operations();
-
- static Migration build(String id, Supplier<List<CuratorOp>> operationsProc)
- {
- return build(id, 1, operationsProc);
- }
-
- static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
- {
- Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(operationsProc, "operationsProc cannot be null");
- return new Migration()
- {
- @Override
- public String id()
- {
- return id;
- }
-
- @Override
- public int version()
- {
- return version;
- }
-
- @Override
- public List<CuratorOp> operations()
- {
- return operationsProc.get();
- }
- };
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 01de2f8..d6d37de 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async.modeled.migrations;
import com.google.common.base.Throwables;
import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.imps.ExtractingCuratorOp;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -29,8 +30,11 @@ import org.apache.curator.x.async.modeled.ModeledFramework;
import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.zookeeper.CreateMode;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@@ -49,27 +53,21 @@ public class MigrationManager
{
private final AsyncCuratorFramework client;
private final ZPath lockPath;
- private final ModelSerializer<MetaData> metaDataSerializer;
private final Executor executor;
private final Duration lockMax;
private static final String META_DATA_NODE_NAME = "meta-";
/**
- * Jackson usage: See the note in {@link org.apache.curator.x.async.modeled.JacksonModelSerializer} regarding how the Jackson library is specified in Curator's Maven file.
- * Unless you are not using Jackson pass <code>JacksonModelSerializer.build(MetaData.class)</code> for <code>metaDataSerializer</code>
- *
* @param client the curator client
* @param lockPath base path for locks used by the manager
- * @param metaDataSerializer JacksonModelSerializer.build(MetaData.class)
* @param executor the executor to use
* @param lockMax max time to wait for locks
*/
- public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
+ public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, Executor executor, Duration lockMax)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
- this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
this.executor = Objects.requireNonNull(executor, "executor cannot be null");
this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
}
@@ -90,18 +88,6 @@ public class MigrationManager
}
/**
- * Utility to return the meta data from previous migrations
- *
- * @param set the set
- * @return stage
- */
- public CompletionStage<List<MetaData>> metaData(MigrationSet set)
- {
- ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
- return ZNode.models(modeled.childrenAsZNodes());
- }
-
- /**
* Can be overridden to change how the comparison to previous migrations is done. The default
* version ensures that the meta data from previous migrations matches the current migration
* set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
@@ -115,21 +101,46 @@ public class MigrationManager
{
if ( sortedMetaData.size() > set.migrations().size() )
{
- throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
+ throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
}
int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
- List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
- .stream()
- .map(m -> new MetaData(m.id(), m.version()))
- .collect(Collectors.toList());
- if ( !compareMigrations.equals(sortedMetaData) )
+ List<Migration> subList = set.migrations().subList(0, compareSize);
+ for ( int i = 0; i < compareSize; ++i )
{
- throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
+ byte[] setHash = hash(set.migrations().get(i).operations()).operationHash();
+ if ( !Arrays.equals(setHash, sortedMetaData.get(i).operationHash()) )
+ {
+ throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
+ }
}
return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
}
+ private MetaData hash(List<CuratorOp> operations)
+ {
+ MessageDigest digest;
+ try
+ {
+ digest = MessageDigest.getInstance("SHA-256");
+ }
+ catch ( NoSuchAlgorithmException e )
+ {
+ throw new RuntimeException(e);
+ }
+ operations.forEach(op -> {
+ if ( op instanceof ExtractingCuratorOp )
+ {
+ ((ExtractingCuratorOp)op).addToDigest(digest);
+ }
+ else
+ {
+ digest.update(op.toString().getBytes());
+ }
+ });
+ return digest::digest;
+ }
+
private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
{
ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
@@ -148,7 +159,21 @@ public class MigrationManager
private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
{
- ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+ ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>()
+ {
+ @Override
+ public byte[] serialize(MetaData model)
+ {
+ return model.operationHash();
+ }
+
+ @Override
+ public MetaData deserialize(byte[] bytes)
+ {
+ return () -> bytes;
+ }
+ };
+ ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
return ModeledFramework.wrap(client, modelSpec);
}
@@ -186,7 +211,7 @@ public class MigrationManager
List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
List<CuratorOp> operations = new ArrayList<>();
operations.addAll(migration.operations());
- MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+ MetaData thisMetaData = hash(operations);
operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
}).collect(Collectors.toList());
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/index.confluence b/curator-x-async/src/site/confluence/index.confluence
index 4d81d44..74a47b4 100644
--- a/curator-x-async/src/site/confluence/index.confluence
+++ b/curator-x-async/src/site/confluence/index.confluence
@@ -40,6 +40,7 @@ This is a strongly typed DSL that allows you to map a Curator\-style client to:
* Options for how nodes should be created (sequential, compressed data, ttl, etc.)
* ACLs for the nodes at the path
* Options for how to delete nodes (guaranteed, deleting children, etc.)
+* Perform ZooKeeper data migration
For example:
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index 3fe5de2..45aa130 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -109,7 +109,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
executor = Executors.newCachedThreadPool();
- manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+ manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
}
@AfterMethod
@@ -124,9 +124,9 @@ public class TestMigrationManager extends CompletableBaseClassForTests
@Test
public void testBasic() throws Exception
{
- Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
- Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
- Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+ Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+ Migration m2 = () -> Collections.singletonList(v2op);
+ Migration m3 = () -> Collections.singletonList(v3op);
MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));
@@ -142,14 +142,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests
@Test
public void testStaged() throws Exception
{
- Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
+ Migration m1 = () -> Arrays.asList(v1opA, v1opB);
MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
- Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+ Migration m2 = () -> Collections.singletonList(v2op);
migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2));
complete(manager.migrate(migrationSet));
@@ -159,7 +159,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertEquals(m.getAge(), 10);
});
- Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+ Migration m3 = () -> Collections.singletonList(v3op);
migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));
[13/23] curator git commit: removed some dead code
Posted by ra...@apache.org.
removed some dead code
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/72ff5a9f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/72ff5a9f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/72ff5a9f
Branch: refs/heads/master
Commit: 72ff5a9fa4c8b0db85dd4e017009143147ac9446
Parents: c40a383
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 14:11:04 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 14:11:04 2017 -0500
----------------------------------------------------------------------
.../async/migrations/TestMigrationManager.java | 37 ++------------------
1 file changed, 3 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/72ff5a9f/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 637aca1..19740d6 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.curator.x.async.migrations;
-import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.transaction.CuratorOp;
@@ -26,24 +25,22 @@ import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.migrations.models.ModelV1;
+import org.apache.curator.x.async.migrations.models.ModelV2;
+import org.apache.curator.x.async.migrations.models.ModelV3;
import org.apache.curator.x.async.modeled.JacksonModelSerializer;
import org.apache.curator.x.async.modeled.ModelSpec;
import org.apache.curator.x.async.modeled.ModeledFramework;
import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.curator.x.async.migrations.models.ModelV1;
-import org.apache.curator.x.async.migrations.models.ModelV2;
-import org.apache.curator.x.async.migrations.models.ModelV3;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.function.UnaryOperator;
public class TestMigrationManager extends CompletableBaseClassForTests
{
@@ -69,34 +66,6 @@ public class TestMigrationManager extends CompletableBaseClassForTests
this.client = AsyncCuratorFramework.wrap(rawClient);
- ObjectMapper mapper = new ObjectMapper();
- UnaryOperator<byte[]> from1to2 = bytes -> {
- try
- {
- ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
- ModelV2 v2 = new ModelV2(v1.getName(), 64);
- return mapper.writeValueAsBytes(v2);
- }
- catch ( IOException e )
- {
- throw new RuntimeException(e);
- }
- };
-
- UnaryOperator<byte[]> from2to3 = bytes -> {
- try
- {
- ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
- String[] nameParts = v2.getName().split("\\s");
- ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge());
- return mapper.writeValueAsBytes(v3);
- }
- catch ( IOException e )
- {
- throw new RuntimeException(e);
- }
- };
-
ZPath modelPath = ZPath.parse("/test/it");
v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
[23/23] curator git commit: Merge branch 'master' into CURATOR-421
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/7a60af0d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/7a60af0d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/7a60af0d
Branch: refs/heads/master
Commit: 7a60af0dddbf1e547ddb7448ce99e14341413ef0
Parents: 69ec4d7 7d4f062
Author: randgalt <ra...@apache.org>
Authored: Tue Jul 18 08:16:05 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Tue Jul 18 08:16:05 2017 -0500
----------------------------------------------------------------------
.../framework/imps/GetDataBuilderImpl.java | 5 +
.../modeled/details/ModeledFrameworkImpl.java | 2 +-
.../curator/x/async/TestBasicOperations.java | 10 ++
.../x/async/modeled/TestModeledFramework.java | 34 ++---
.../discovery/details/TestServiceDiscovery.java | 130 +++++++------------
5 files changed, 78 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
[12/23] curator git commit: some more refactoring, tests and doc
Posted by ra...@apache.org.
some more refactoring, tests and doc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c40a3836
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c40a3836
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c40a3836
Branch: refs/heads/master
Commit: c40a3836131ad49279e05a5c3dd6656848c1eb60
Parents: d80651a
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 13:32:05 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 13:32:05 2017 -0500
----------------------------------------------------------------------
.../x/async/migrations/MigrationManager.java | 20 ++--
.../x/async/migrations/MigrationSet.java | 14 +--
.../src/site/confluence/index.confluence | 9 +-
.../src/site/confluence/migrations.confluence | 97 ++++++++++++++++++++
curator-x-async/src/site/site.xml | 2 +-
.../async/migrations/TestMigrationManager.java | 36 +++++++-
6 files changed, 150 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
index cb3d6ff..676eef6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -50,6 +50,7 @@ public class MigrationManager
{
private final AsyncCuratorFramework client;
private final String lockPath;
+ private final String metaDataPath;
private final Executor executor;
private final Duration lockMax;
@@ -58,13 +59,15 @@ public class MigrationManager
/**
* @param client the curator client
* @param lockPath base path for locks used by the manager
+ * @param metaDataPath base path to store the meta data
* @param executor the executor to use
* @param lockMax max time to wait for locks
*/
- public MigrationManager(AsyncCuratorFramework client, String lockPath, Executor executor, Duration lockMax)
+ public MigrationManager(AsyncCuratorFramework client, String lockPath, String metaDataPath, Executor executor, Duration lockMax)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+ this.metaDataPath = Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
this.executor = Objects.requireNonNull(executor, "executor cannot be null");
this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
}
@@ -139,8 +142,9 @@ public class MigrationManager
private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
{
- return childrenWithData(client, set.metaDataPath())
- .thenCompose(metaData -> applyMetaData(set, metaData))
+ String thisMetaDataPath = ZKPaths.makePath(metaDataPath, set.id());
+ return childrenWithData(client, thisMetaDataPath)
+ .thenCompose(metaData -> applyMetaData(set, metaData, thisMetaDataPath))
.handle((v, e) -> {
release(lock, true);
if ( e != null )
@@ -152,7 +156,7 @@ public class MigrationManager
);
}
- private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData)
+ private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData, String thisMetaDataPath)
{
List<byte[]> sortedMetaData = metaData.keySet()
.stream()
@@ -177,13 +181,13 @@ public class MigrationManager
return CompletableFuture.completedFuture(null);
}
- return asyncEnsureContainers(client, set.metaDataPath())
- .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied));
+ return asyncEnsureContainers(client, thisMetaDataPath)
+ .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath));
}
- private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied)
+ private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath)
{
- String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), META_DATA_NODE_NAME);
+ String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME);
List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
List<CuratorOp> operations = new ArrayList<>();
operations.addAll(migration.operations());
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
index 089d3d8..94b5205 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
@@ -34,19 +34,13 @@ public interface MigrationSet
String id();
/**
- * @return where to store the meta data for this migration set
- */
- String metaDataPath();
-
- /**
* @return list of migrations in the order that they should be applied
*/
List<Migration> migrations();
- static MigrationSet build(String id, String metaDataPath, List<Migration> migrations)
+ static MigrationSet build(String id, List<Migration> migrations)
{
Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
return new MigrationSet()
{
@@ -57,12 +51,6 @@ public interface MigrationSet
}
@Override
- public String metaDataPath()
- {
- return metaDataPath;
- }
-
- @Override
public List<Migration> migrations()
{
return migrationsCopy;
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/index.confluence b/curator-x-async/src/site/confluence/index.confluence
index 74a47b4..64f3786 100644
--- a/curator-x-async/src/site/confluence/index.confluence
+++ b/curator-x-async/src/site/confluence/index.confluence
@@ -40,7 +40,6 @@ This is a strongly typed DSL that allows you to map a Curator\-style client to:
* Options for how nodes should be created (sequential, compressed data, ttl, etc.)
* ACLs for the nodes at the path
* Options for how to delete nodes (guaranteed, deleting children, etc.)
-* Perform ZooKeeper data migration
For example:
@@ -50,3 +49,11 @@ modeled.set(new Foo());
{code}
See [[Modeled Curator|modeled.html]] for details.
+
+h2. [[Migrations|migrations.html]]
+
+Curator Migrations allow you pre\-apply transactions in a staged manner so that you
+can ensure a consistent state for parts of your ZooKeeper node hierarchy in a manner
+similar to database migration utilities.
+
+See [[Migrations|migrations.html]] for details.
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/confluence/migrations.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence
new file mode 100644
index 0000000..4775fac
--- /dev/null
+++ b/curator-x-async/src/site/confluence/migrations.confluence
@@ -0,0 +1,97 @@
+h1. Migrations
+
+Curator Migrations allow you pre\-apply transactions in a staged manner so that you
+can ensure a consistent state for parts of your ZooKeeper node hierarchy in a manner
+similar to database migration utilities.
+
+h2. Background and Usage
+
+Note: To use Migrations, you should be familiar with Java 8's lambdas, CompletedFuture and CompletionStage.
+
+A "migration" is a set of operations to be performed in a transaction. A "migration set" is a list
+of migrations. Combined, this can be used to ensure an initial state for your ZooKeeper nodes as
+well as supporting upgrading/modifying existing state.
+
+For example, given a brand new ZooKeeper instance you might want to populate a few nodes
+and data. E.g.
+
+{code}
+CuratorOp op1 = client.transactionOp().create().forPath("/parent");
+CuratorOp op2 = client.transactionOp().create().forPath("/parent/one");
+CuratorOp op3 = client.transactionOp().create().forPath("/parent/two");
+CuratorOp op4 = client.transactionOp().create().forPath("/parent/three");
+CuratorOp op5 = client.transactionOp().create().forPath("/main", someData);
+{code}
+
+All 5 of these operations would be combined into a migration and set:
+
+{code}
+Migration migration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+MigrationSet set = MigrationSet.build("main", Collections.singletonList(migration));
+{code}
+
+This set can then be passed to a {{MigrationManager}} for processing. The MigrationManager
+checks to see if the migration has been applied already and, if not, processes the transaction.
+
+At a future date, the migration set could be expanded to update/modify things. E.g.
+
+{code}
+CuratorOp newOp1 = client.transactionOp().create().forPath("/new");
+CuratorOp newOp2 = client.transactionOp().delete().forPath("/main"); // maybe this is no longer needed
+{code}
+
+This would be combined with the previous migration:
+
+{code}
+Migration initialMigration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+Migration newMigration = () -> Arrays.asList(newOp1, newOp2);
+MigrationSet set = MigrationSet.build("main", Arrays.asList(initialMigration, newMigration));
+{code}
+
+When this set is run, the MigrationManager will perform both migration operations on new
+ZooKeeper databases but only the second "newMigration" on ZK databases that already have
+the first migration applied.
+
+h2. Details/Reference
+
+_Migration_
+
+A Migration is a wrapper around a list of operations that constitute one stage in a migration
+set and are applied as a single transaction.
+
+_MigrationSet_
+
+A MigrationSet is an ordered list of Migrations. Curator keeps track of which migrations in a
+set have been previously applied and only processes un\-applied migrations. Each migration
+set must have a unique identifier. Create a MigrationSet via its builder:
+
+{code}
+MigrationSet set = MigrationSet.build(migrationId, migrations);
+{code}
+
+_MigrationManager_
+
+The MigrationManager processes MigrationSets. Usually, you'd run this only on new ZooKeeper
+databases or as part of a maintenance operation to update the ZooKeeper database. E.g.
+
+{code}
+MigrationManager manager = new MigrationManager(client,
+ lockPath, // base path for locks used by the manager
+ metaDataPath, // base path to store the meta data
+ executor, // the executor to use
+ lockMax // max time to wait for locks
+);
+manager.migrate(set).exceptionally(e -> {
+ if ( e instanceof MigrationException ) {
+ // migration checksum failed, etc.
+ } else {
+ // some other kind of error
+ }
+ return null;
+});
+{code}
+
+Each migration in the set is applied in a transaction. MigrationManager stores a hash
+of all operations in a migration so that it can be compared for future operations. i.e.
+if, in the future, a migration set is attempted but the hash of one of the previous migrations
+does not match, the stage completes exceptionally with {{MigrationException}}.
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/site.xml
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/site.xml b/curator-x-async/src/site/site.xml
index f78abc7..fc0a67a 100644
--- a/curator-x-async/src/site/site.xml
+++ b/curator-x-async/src/site/site.xml
@@ -25,7 +25,7 @@
<link rel="stylesheet" href="../css/site.css" />
<script type="text/javascript">
$(function(){
- if ( location && location.pathname && location.pathname.endsWith('/index.html') ) {
+ if ( location && location.pathname && (location.pathname.endsWith('/index.html') || location.pathname.endsWith('/migrations.html')) ) {
$('a[title="Java 8/Async"]').parent().addClass("active");
} else {
$('a[title="Strongly Typed Models"]').parent().addClass("active");
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 42bc76d..637aca1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -109,7 +109,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
executor = Executors.newCachedThreadPool();
- manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
+ manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
}
@AfterMethod
@@ -127,7 +127,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Migration m1 = () -> Arrays.asList(v1opA, v1opB);
Migration m2 = () -> Collections.singletonList(v2op);
Migration m3 = () -> Collections.singletonList(v3op);
- MigrationSet migrationSet = MigrationSet.build("1", "/metadata", Arrays.asList(m1, m2, m3));
+ MigrationSet migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));
@@ -143,14 +143,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests
public void testStaged() throws Exception
{
Migration m1 = () -> Arrays.asList(v1opA, v1opB);
- MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", Collections.singletonList(m1));
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(m1));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
Migration m2 = () -> Collections.singletonList(v2op);
- migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2));
+ migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
@@ -160,7 +160,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
});
Migration m3 = () -> Collections.singletonList(v3op);
- migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2, m3));
+ migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
@@ -170,4 +170,30 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertEquals(m.getLastName(), "Two");
});
}
+
+ @Test
+ public void testDocExample() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/parent");
+ CuratorOp op2 = client.transactionOp().create().forPath("/parent/one");
+ CuratorOp op3 = client.transactionOp().create().forPath("/parent/two");
+ CuratorOp op4 = client.transactionOp().create().forPath("/parent/three");
+ CuratorOp op5 = client.transactionOp().create().forPath("/main", "hey".getBytes());
+
+ Migration initialMigration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+ MigrationSet migrationSet = MigrationSet.build("main", Collections.singletonList(initialMigration));
+ complete(manager.migrate(migrationSet));
+
+ Assert.assertNotNull(client.unwrap().checkExists().forPath("/parent/three"));
+ Assert.assertEquals(client.unwrap().getData().forPath("/main"), "hey".getBytes());
+
+ CuratorOp newOp1 = client.transactionOp().create().forPath("/new");
+ CuratorOp newOp2 = client.transactionOp().delete().forPath("/main"); // maybe this is no longer needed
+
+ Migration newMigration = () -> Arrays.asList(newOp1, newOp2);
+ migrationSet = MigrationSet.build("main", Arrays.asList(initialMigration, newMigration));
+ complete(manager.migrate(migrationSet));
+
+ Assert.assertNull(client.unwrap().checkExists().forPath("/main"));
+ }
}
[03/23] curator git commit: no longer used
Posted by ra...@apache.org.
no longer used
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a0c1620
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a0c1620
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a0c1620
Branch: refs/heads/master
Commit: 1a0c1620ef3b9bbac0f41dddf153ae7e83aed8c0
Parents: 1784a7c
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 08:07:36 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 08:07:36 2017 -0500
----------------------------------------------------------------------
.../x/async/modeled/migrations/Result.java | 23 --------------------
1 file changed, 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1a0c1620/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
deleted file mode 100644
index 4fcfeac..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Result.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-public interface Result
-{
-}
[07/23] curator git commit: Merge branch 'master' into CURATOR-421
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ecbd3866
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ecbd3866
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ecbd3866
Branch: refs/heads/master
Commit: ecbd386645e015a72929ce37f33772c7b5efb3d1
Parents: bc7bf4a 11be719
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 10:09:31 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 10:09:31 2017 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------
[17/23] curator git commit: more tests
Posted by ra...@apache.org.
more tests
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9385d049
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9385d049
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9385d049
Branch: refs/heads/master
Commit: 9385d0490d8c684a602b4c57489e39941a9a178b
Parents: 75118e4
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 17:34:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 17:34:00 2017 -0500
----------------------------------------------------------------------
.../async/migrations/TestMigrationManager.java | 88 +++++++++++++++++++-
1 file changed, 87 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/9385d049/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 80a03bb..786e704 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -25,6 +25,7 @@ import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.AsyncWrappers;
import org.apache.curator.x.async.CompletableBaseClassForTests;
import org.apache.curator.x.async.migrations.models.ModelV1;
import org.apache.curator.x.async.migrations.models.ModelV2;
@@ -41,12 +42,20 @@ import org.testng.annotations.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
public class TestMigrationManager extends CompletableBaseClassForTests
{
+ private static final String LOCK_PATH = "/migrations/locks";
+ private static final String META_DATA_PATH = "/migrations/metadata";
private AsyncCuratorFramework client;
private ModelSpec<ModelV1> v1Spec;
private ModelSpec<ModelV2> v2Spec;
@@ -57,6 +66,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
private CuratorOp v2op;
private CuratorOp v3op;
private MigrationManager manager;
+ private final AtomicReference<CountDownLatch> filterLatch = new AtomicReference<>();
@BeforeMethod
@Override
@@ -81,7 +91,27 @@ public class TestMigrationManager extends CompletableBaseClassForTests
v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
executor = Executors.newCachedThreadPool();
- manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
+ manager = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMinutes(10))
+ {
+ @Override
+ protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException
+ {
+ CountDownLatch localLatch = filterLatch.getAndSet(null);
+ if ( localLatch != null )
+ {
+ try
+ {
+ localLatch.await();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
+ Throwables.propagate(e);
+ }
+ }
+ return super.filter(set, operationHashesInOrder);
+ }
+ };
manager.debugCount = new AtomicInteger();
}
@@ -260,4 +290,60 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertNull(client.unwrap().checkExists().forPath("/test"));
}
+
+ @Test
+ public void testConcurrency1() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/test");
+ CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+ Migration migration = () -> Arrays.asList(op1, op2);
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ CountDownLatch latch = new CountDownLatch(1);
+ filterLatch.set(latch);
+ CompletionStage<Void> first = manager.migrate(migrationSet);
+
+ MigrationManager manager2 = new MigrationManager(client, LOCK_PATH, META_DATA_PATH, executor, Duration.ofMillis(timing.forSleepingABit().milliseconds()));
+ try
+ {
+ complete(manager2.migrate(migrationSet));
+ Assert.fail("Should throw");
+ }
+ catch ( Throwable e )
+ {
+ Assert.assertTrue(Throwables.getRootCause(e) instanceof AsyncWrappers.TimeoutException);
+ }
+
+ latch.countDown();
+ complete(first);
+ Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes());
+ }
+
+ @Test
+ public void testConcurrency2() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/test");
+ CuratorOp op2 = client.transactionOp().create().forPath("/test/bar", "first".getBytes());
+ Migration migration = () -> Arrays.asList(op1, op2);
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(migration));
+ CountDownLatch latch = new CountDownLatch(1);
+ filterLatch.set(latch);
+ CompletionStage<Void> first = manager.migrate(migrationSet);
+
+ CompletionStage<Void> second = manager.migrate(migrationSet);
+ try
+ {
+ second.toCompletableFuture().get(timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS);
+ Assert.fail("Should throw");
+ }
+ catch ( Throwable e )
+ {
+ Assert.assertTrue(Throwables.getRootCause(e) instanceof TimeoutException);
+ }
+
+ latch.countDown();
+ complete(first);
+ Assert.assertEquals(client.unwrap().getData().forPath("/test/bar"), "first".getBytes());
+ complete(second);
+ Assert.assertEquals(manager.debugCount.get(), 1);
+ }
}
[10/23] curator git commit: Merge branch 'master' into CURATOR-421
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2ab172a4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2ab172a4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2ab172a4
Branch: refs/heads/master
Commit: 2ab172a45722efb08e85b6b4d230d9d04b4ce15b
Parents: c8df9a4 1b6216e
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:05:00 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:05:00 2017 -0500
----------------------------------------------------------------------
.../main/java/org/apache/curator/x/async/AsyncWrappers.java | 6 ++++--
.../java/org/apache/curator/x/async/TestBasicOperations.java | 3 +--
2 files changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
[06/23] curator git commit: more testing
Posted by ra...@apache.org.
more testing
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/bc7bf4ac
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/bc7bf4ac
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/bc7bf4ac
Branch: refs/heads/master
Commit: bc7bf4ace6dcc186352f1c3731e1cae69cef2e1f
Parents: c3adc95
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 10:08:04 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 10:08:04 2017 -0500
----------------------------------------------------------------------
.../modeled/migrations/MigrationManager.java | 4 +-
.../migrations/TestMigrationManager.java | 58 ++++++++++++++++----
2 files changed, 49 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/bc7bf4ac/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index e59b7bf..bfb9707 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -79,7 +79,7 @@ public class MigrationManager
{
if ( sortedMetaData.size() > set.migrations().size() )
{
- throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+ throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
}
int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
@@ -89,7 +89,7 @@ public class MigrationManager
.collect(Collectors.toList());
if ( !compareMigrations.equals(sortedMetaData) )
{
- throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+ throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
}
return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
}
http://git-wip-us.apache.org/repos/asf/curator/blob/bc7bf4ac/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index 9fcd53c..3fe5de2 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -48,11 +48,15 @@ import java.util.function.UnaryOperator;
public class TestMigrationManager extends CompletableBaseClassForTests
{
private AsyncCuratorFramework client;
- private MigrationSet migrationSet;
private ModelSpec<ModelV1> v1Spec;
private ModelSpec<ModelV2> v2Spec;
private ModelSpec<ModelV3> v3Spec;
private ExecutorService executor;
+ private CuratorOp v1opA;
+ private CuratorOp v1opB;
+ private CuratorOp v2op;
+ private CuratorOp v3op;
+ private MigrationManager manager;
@BeforeMethod
@Override
@@ -99,17 +103,13 @@ public class TestMigrationManager extends CompletableBaseClassForTests
v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
- CuratorOp v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
- CuratorOp v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
- CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
- CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
-
- Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
- Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
- Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
- migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
+ v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
+ v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+ v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+ v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
executor = Executors.newCachedThreadPool();
+ manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
}
@AfterMethod
@@ -124,7 +124,43 @@ public class TestMigrationManager extends CompletableBaseClassForTests
@Test
public void testBasic() throws Exception
{
- MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+ Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
+ Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+ Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+ MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
+
+ complete(manager.migrate(migrationSet));
+
+ ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
+ complete(v3Client.read(), (m, e) -> {
+ Assert.assertEquals(m.getAge(), 30);
+ Assert.assertEquals(m.getFirstName(), "One");
+ Assert.assertEquals(m.getLastName(), "Two");
+ });
+ }
+
+ @Test
+ public void testStaged() throws Exception
+ {
+ Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
+ MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1));
+ complete(manager.migrate(migrationSet));
+
+ ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
+ complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
+
+ Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+ migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2));
+ complete(manager.migrate(migrationSet));
+
+ ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
+ complete(v2Client.read(), (m, e) -> {
+ Assert.assertEquals(m.getName(), "Test 2");
+ Assert.assertEquals(m.getAge(), 10);
+ });
+
+ Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+ migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
[19/23] curator git commit: Merge branch 'master' into CURATOR-421
Posted by ra...@apache.org.
Merge branch 'master' into CURATOR-421
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/84191f3d
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/84191f3d
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/84191f3d
Branch: refs/heads/master
Commit: 84191f3d46d015cb49ebebb42fddc1019b3db6c3
Parents: 9d30a8c 123f2ec
Author: randgalt <ra...@apache.org>
Authored: Sat Jul 15 11:03:53 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jul 15 11:03:53 2017 -0500
----------------------------------------------------------------------
----------------------------------------------------------------------