You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/14 14:36:57 UTC
[2/3] curator git commit: reworking so that this feature is more
general. Now manages any set of transactions
reworking so that this feature is more general. Now manages any set of transactions
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a15582b
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a15582b
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a15582b
Branch: refs/heads/CURATOR-421
Commit: 1a15582b59ccfaf80b1547fd57bcb7dfe894b1c8
Parents: 1a0c162
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 09:24:33 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 09:24:33 2017 -0500
----------------------------------------------------------------------
.../InvalidMigrationSetException.java | 37 ----
.../x/async/modeled/migrations/Migration.java | 14 +-
.../modeled/migrations/MigrationException.java | 37 ++++
.../modeled/migrations/MigrationManager.java | 132 +++++++++++++-
.../migrations/MigrationManagerBuilder.java | 68 -------
.../migrations/MigrationManagerImpl.java | 181 -------------------
.../async/modeled/migrations/MigrationSet.java | 11 +-
.../migrations/TestMigrationManager.java | 35 ++--
8 files changed, 196 insertions(+), 319 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
deleted file mode 100644
index 84b21bf..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import java.util.Objects;
-
-public class InvalidMigrationSetException extends RuntimeException
-{
- private final String migrationId;
-
- public InvalidMigrationSetException(String migrationId, String message)
- {
- super(message);
- this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
- }
-
- public String getMigrationId()
- {
- return migrationId;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index b3919d1..63a7a7d 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -18,8 +18,10 @@
*/
package org.apache.curator.x.async.modeled.migrations;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import java.util.List;
import java.util.Objects;
-import java.util.function.UnaryOperator;
+import java.util.function.Supplier;
public interface Migration
{
@@ -27,12 +29,12 @@ public interface Migration
int version();
- byte[] migrate(byte[] previousBytes);
+ List<CuratorOp> operations();
- static Migration build(String id, int version, UnaryOperator<byte[]> migrateProc)
+ static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
{
Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(migrateProc, "migrateProc cannot be null");
+ Objects.requireNonNull(operationsProc, "operationsProc cannot be null");
return new Migration()
{
@Override
@@ -48,9 +50,9 @@ public interface Migration
}
@Override
- public byte[] migrate(byte[] previousBytes)
+ public List<CuratorOp> operations()
{
- return migrateProc.apply(previousBytes);
+ return operationsProc.get();
}
};
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
new file mode 100644
index 0000000..1a1d59c
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.modeled.migrations;
+
+import java.util.Objects;
+
+public class MigrationException extends RuntimeException
+{
+ private final String migrationId;
+
+ public MigrationException(String migrationId, String message)
+ {
+ super(message);
+ this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
+ }
+
+ public String getMigrationId()
+ {
+ return migrationId;
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 2d5f39f..47adb1e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -18,20 +18,142 @@
*/
package org.apache.curator.x.async.modeled.migrations;
+import com.google.common.base.Throwables;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.x.async.AsyncCuratorFramework;
import org.apache.curator.x.async.modeled.ModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.CreateMode;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Comparator;
import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
-public interface MigrationManager
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+public class MigrationManager
{
- CompletionStage<List<MetaData>> metaData(ZPath metaDataPath);
+ private final AsyncCuratorFramework client;
+ private final ZPath lockPath;
+ private final ModelSerializer<MetaData> metaDataSerializer;
+ private final Executor executor;
+ private final Duration lockMax;
+
+ private static final String META_DATA_NODE_NAME = "meta-";
+
+ public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
+ {
+ this.client = Objects.requireNonNull(client, "client cannot be null");
+ this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+ this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
+ this.executor = Objects.requireNonNull(executor, "executor cannot be null");
+ this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
+ }
+
+ public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
+ {
+ ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
+ return ZNode.models(modeled.childrenAsZNodes());
+ }
+
+ public CompletionStage<Void> migrate(MigrationSet set)
+ {
+ String lockPath = this.lockPath.child(set.id()).fullPath();
+ InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
+ CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
+ return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+ }
+
+ protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException
+ {
+ if ( sortedMetaData.size() > set.migrations().size() )
+ {
+ throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+ }
- CompletionStage<Void> run();
+ int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
+ List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
+ .stream()
+ .map(m -> new MetaData(m.id(), m.version()))
+ .collect(Collectors.toList());
+ if ( !compareMigrations.equals(sortedMetaData) )
+ {
+ throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
+ }
+ return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
+ }
+
+ private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
+ {
+ ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
+ return modeled.childrenAsZNodes()
+ .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
+ .handle((v, e) -> {
+ release(lock, true);
+ if ( e != null )
+ {
+ Throwables.propagate(e);
+ }
+ return v;
+ }
+ );
+ }
+
+ private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
+ {
+ ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+ return ModeledFramework.wrap(client, modelSpec);
+ }
+
+ private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
+ {
+ List<MetaData> sortedMetaData = metaDataNodes
+ .stream()
+ .sorted(Comparator.comparing(m -> m.path().fullPath()))
+ .map(ZNode::model)
+ .collect(Collectors.toList());
+
+ List<Migration> toBeApplied;
+ try
+ {
+ toBeApplied = filter(set, sortedMetaData);
+ }
+ catch ( MigrationException e )
+ {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+
+ if ( toBeApplied.size() == 0 )
+ {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
+ .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient));
+ }
- static MigrationManagerBuilder builder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
+ private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
{
- return new MigrationManagerBuilder(client, lockPath, metaDataSerializer);
+ List<CuratorOp> operations = new ArrayList<>();
+ for ( Migration migration : toBeApplied )
+ {
+ operations.addAll(migration.operations());
+ MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+ operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
+ }
+ return client.transaction().forOperations(operations).thenApply(__ -> null);
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
deleted file mode 100644
index ed48242..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ZPath;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.Executor;
-
-public class MigrationManagerBuilder
-{
- private final AsyncCuratorFramework client;
- private final ZPath lockPath;
- private final ModelSerializer<MetaData> metaDataSerializer;
- private final List<MigrationSet> sets = new ArrayList<>();
- private Executor executor = Runnable::run;
- private Duration lockMax = Duration.ofSeconds(15);
-
- MigrationManagerBuilder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer)
- {
- this.client = Objects.requireNonNull(client, "client cannot be null");
- this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
- this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
- }
-
- public MigrationManager build()
- {
- return new MigrationManagerImpl(client, lockPath, metaDataSerializer, executor, lockMax, sets);
- }
-
- public MigrationManagerBuilder withExecutor(Executor executor)
- {
- this.executor = Objects.requireNonNull(executor, "executor cannot be null");
- return this;
- }
-
- public MigrationManagerBuilder withLockMax(Duration lockMax)
- {
- this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
- return this;
- }
-
- public MigrationManagerBuilder adding(MigrationSet set)
- {
- sets.add(Objects.requireNonNull(set, "set cannot be null"));
- return this;
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
deleted file mode 100644
index 15c61b2..0000000
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZNode;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.CreateMode;
-import java.time.Duration;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.curator.x.async.AsyncWrappers.*;
-
-class MigrationManagerImpl implements MigrationManager
-{
- private final AsyncCuratorFramework client;
- private final ZPath lockPath;
- private final ModelSerializer<MetaData> metaDataSerializer;
- private final Executor executor;
- private final Duration lockMax;
- private final List<MigrationSet> sets;
-
- private static final String META_DATA_NODE_NAME = "meta-";
-
- MigrationManagerImpl(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax, List<MigrationSet> sets)
- {
- this.client = client;
- this.lockPath = lockPath;
- this.metaDataSerializer = metaDataSerializer;
- this.executor = executor;
- this.lockMax = lockMax;
- this.sets = ImmutableList.copyOf(sets);
- }
-
- @Override
- public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath)
- {
- ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath);
- return ZNode.models(modeled.childrenAsZNodes());
- }
-
- @Override
- public CompletionStage<Void> run()
- {
- Map<String, CompletableFuture<Void>> futures = sets
- .stream()
- .map(m -> new AbstractMap.SimpleEntry<>(m.id(), runMigration(m).toCompletableFuture()))
- .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
- return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[futures.size()]));
- }
-
- private CompletionStage<Void> runMigration(MigrationSet set)
- {
- String lockPath = this.lockPath.child(set.id()).fullPath();
- InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath);
- CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor);
- return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
- }
-
- private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
- {
- ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
- return modeled.childrenAsZNodes()
- .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
- .handle((v, e) -> {
- release(lock, true);
- if ( e != null )
- {
- Throwables.propagate(e);
- }
- return v;
- }
- );
- }
-
- private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
- {
- ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
- return ModeledFramework.wrap(client, modelSpec);
- }
-
- protected void checkIsValid(MigrationSet set, List<MetaData> sortedMetaData) throws InvalidMigrationSetException
- {
- if ( sortedMetaData.size() > set.migrations().size() )
- {
- throw new InvalidMigrationSetException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
- }
-
- int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
- List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
- .stream()
- .map(m -> new MetaData(m.id(), m.version()))
- .collect(Collectors.toList());
- if ( !compareMigrations.equals(sortedMetaData) )
- {
- throw new InvalidMigrationSetException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData));
- }
- }
-
- private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
- {
- List<MetaData> sortedMetaData = metaDataNodes
- .stream()
- .sorted(Comparator.comparing(m -> m.path().fullPath()))
- .map(ZNode::model)
- .collect(Collectors.toList());
- try
- {
- checkIsValid(set, sortedMetaData);
- }
- catch ( InvalidMigrationSetException e )
- {
- CompletableFuture<Void> future = new CompletableFuture<>();
- future.completeExceptionally(e);
- return future;
- }
-
- List<Migration> toBeApplied = set.migrations().subList(sortedMetaData.size(), set.migrations().size());
- if ( toBeApplied.size() == 0 )
- {
- return CompletableFuture.completedFuture(null);
- }
-
- return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
- .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, metaDataClient));
- }
-
- private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient)
- {
- ModelSpec<byte[]> modelSpec = ModelSpec.builder(set.path(), ModelSerializer.raw).build();
- ModeledFramework<byte[]> modeled = ModeledFramework.wrap(client, modelSpec);
- return modeled.childrenAsZNodes().thenCompose(nodes -> {
- List<CuratorOp> operations = new ArrayList<>();
- for ( ZNode<byte[]> node : nodes )
- {
- byte[] currentBytes = node.model();
- for ( Migration migration : toBeApplied )
- {
- currentBytes = migration.migrate(currentBytes);
- MetaData thisMetaData = new MetaData(migration.id(), migration.version());
- operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
- }
- operations.add(modeled.child(node.path().nodeName()).updateOp(currentBytes, node.stat().getVersion()));
- }
- return client.transaction().forOperations(operations).thenApply(__ -> null);
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
index 9e41989..0a0dbe0 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
@@ -27,16 +27,13 @@ public interface MigrationSet
{
String id();
- ZPath path();
-
ZPath metaDataPath();
List<Migration> migrations();
- static MigrationSet build(String id, ZPath path, ZPath metaDataPath, List<Migration> migrations)
+ static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations)
{
Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(path, "path cannot be null");
Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
return new MigrationSet()
@@ -48,12 +45,6 @@ public interface MigrationSet
}
@Override
- public ZPath path()
- {
- return path;
- }
-
- @Override
public ZPath metaDataPath()
{
return metaDataPath;
http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index d709abe..daf69cd 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.migrations;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -37,7 +38,11 @@ import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.UnaryOperator;
public class TestMigrationManager extends CompletableBaseClassForTests
@@ -47,6 +52,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
private ModelSpec<ModelV1> v1Spec;
private ModelSpec<ModelV2> v2Spec;
private ModelSpec<ModelV3> v3Spec;
+ private ExecutorService executor;
@BeforeMethod
@Override
@@ -54,10 +60,10 @@ public class TestMigrationManager extends CompletableBaseClassForTests
{
super.setup();
- CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
- client.start();
+ CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100));
+ rawClient.start();
- this.client = AsyncCuratorFramework.wrap(client);
+ this.client = AsyncCuratorFramework.wrap(rawClient);
ObjectMapper mapper = new ObjectMapper();
UnaryOperator<byte[]> from1to2 = bytes -> {
@@ -89,13 +95,20 @@ public class TestMigrationManager extends CompletableBaseClassForTests
ZPath modelPath = ZPath.parse("/test/it");
- Migration m1 = Migration.build("1",1, from1to2);
- Migration m2 = Migration.build("2",1, from2to3);
- migrationSet = MigrationSet.build("1", modelPath, ZPath.parse("/metadata"), Arrays.asList(m1, m2));
-
v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build();
v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build();
v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build();
+
+ CuratorOp v1op = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test"));
+ CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10));
+ CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
+
+ Migration m1 = Migration.build("1",1, () -> Collections.singletonList(v1op));
+ Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+ Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+ migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
+
+ executor = Executors.newCachedThreadPool();
}
@AfterMethod
@@ -103,6 +116,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
public void teardown() throws Exception
{
CloseableUtils.closeQuietly(client.unwrap());
+ executor.shutdownNow();
super.teardown();
}
@@ -113,11 +127,8 @@ public class TestMigrationManager extends CompletableBaseClassForTests
ModelV1 v1 = new ModelV1("John Galt");
complete(v1Client.child("1").set(v1));
- MigrationManager manager = MigrationManager.builder(this.client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class))
- .adding(migrationSet)
- .build();
-
- complete(manager.run());
+ MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+ complete(manager.migrate(migrationSet));
ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
complete(v3Client.child("1").read(), (m, e) -> {