You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/14 18:32:10 UTC
curator git commit: some more refactoring, tests and doc
Repository: curator
Updated Branches:
refs/heads/CURATOR-421 d80651a7a -> c40a38361
some more refactoring, tests and doc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c40a3836
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c40a3836
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c40a3836
Branch: refs/heads/CURATOR-421
Commit: c40a3836131ad49279e05a5c3dd6656848c1eb60
Parents: d80651a
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 13:32:05 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 13:32:05 2017 -0500
----------------------------------------------------------------------
.../x/async/migrations/MigrationManager.java | 20 ++--
.../x/async/migrations/MigrationSet.java | 14 +--
.../src/site/confluence/index.confluence | 9 +-
.../src/site/confluence/migrations.confluence | 97 ++++++++++++++++++++
curator-x-async/src/site/site.xml | 2 +-
.../async/migrations/TestMigrationManager.java | 36 +++++++-
6 files changed, 150 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
index cb3d6ff..676eef6 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -50,6 +50,7 @@ public class MigrationManager
{
private final AsyncCuratorFramework client;
private final String lockPath;
+ private final String metaDataPath;
private final Executor executor;
private final Duration lockMax;
@@ -58,13 +59,15 @@ public class MigrationManager
/**
* @param client the curator client
* @param lockPath base path for locks used by the manager
+ * @param metaDataPath base path to store the meta data
* @param executor the executor to use
* @param lockMax max time to wait for locks
*/
- public MigrationManager(AsyncCuratorFramework client, String lockPath, Executor executor, Duration lockMax)
+ public MigrationManager(AsyncCuratorFramework client, String lockPath, String metaDataPath, Executor executor, Duration lockMax)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
+ this.metaDataPath = Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
this.executor = Objects.requireNonNull(executor, "executor cannot be null");
this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
}
@@ -139,8 +142,9 @@ public class MigrationManager
private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
{
- return childrenWithData(client, set.metaDataPath())
- .thenCompose(metaData -> applyMetaData(set, metaData))
+ String thisMetaDataPath = ZKPaths.makePath(metaDataPath, set.id());
+ return childrenWithData(client, thisMetaDataPath)
+ .thenCompose(metaData -> applyMetaData(set, metaData, thisMetaDataPath))
.handle((v, e) -> {
release(lock, true);
if ( e != null )
@@ -152,7 +156,7 @@ public class MigrationManager
);
}
- private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData)
+ private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData, String thisMetaDataPath)
{
List<byte[]> sortedMetaData = metaData.keySet()
.stream()
@@ -177,13 +181,13 @@ public class MigrationManager
return CompletableFuture.completedFuture(null);
}
- return asyncEnsureContainers(client, set.metaDataPath())
- .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied));
+ return asyncEnsureContainers(client, thisMetaDataPath)
+ .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath));
}
- private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied)
+ private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath)
{
- String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), META_DATA_NODE_NAME);
+ String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME);
List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
List<CuratorOp> operations = new ArrayList<>();
operations.addAll(migration.operations());
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
index 089d3d8..94b5205 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
@@ -34,19 +34,13 @@ public interface MigrationSet
String id();
/**
- * @return where to store the meta data for this migration set
- */
- String metaDataPath();
-
- /**
* @return list of migrations in the order that they should be applied
*/
List<Migration> migrations();
- static MigrationSet build(String id, String metaDataPath, List<Migration> migrations)
+ static MigrationSet build(String id, List<Migration> migrations)
{
Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations);
return new MigrationSet()
{
@@ -57,12 +51,6 @@ public interface MigrationSet
}
@Override
- public String metaDataPath()
- {
- return metaDataPath;
- }
-
- @Override
public List<Migration> migrations()
{
return migrationsCopy;
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/index.confluence b/curator-x-async/src/site/confluence/index.confluence
index 74a47b4..64f3786 100644
--- a/curator-x-async/src/site/confluence/index.confluence
+++ b/curator-x-async/src/site/confluence/index.confluence
@@ -40,7 +40,6 @@ This is a strongly typed DSL that allows you to map a Curator\-style client to:
* Options for how nodes should be created (sequential, compressed data, ttl, etc.)
* ACLs for the nodes at the path
* Options for how to delete nodes (guaranteed, deleting children, etc.)
-* Perform ZooKeeper data migration
For example:
@@ -50,3 +49,11 @@ modeled.set(new Foo());
{code}
See [[Modeled Curator|modeled.html]] for details.
+
+h2. [[Migrations|migrations.html]]
+
+Curator Migrations allow you pre\-apply transactions in a staged manner so that you
+can ensure a consistent state for parts of your ZooKeeper node hierarchy in a manner
+similar to database migration utilities.
+
+See [[Migrations|migrations.html]] for details.
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/confluence/migrations.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence
new file mode 100644
index 0000000..4775fac
--- /dev/null
+++ b/curator-x-async/src/site/confluence/migrations.confluence
@@ -0,0 +1,97 @@
+h1. Migrations
+
+Curator Migrations allow you pre\-apply transactions in a staged manner so that you
+can ensure a consistent state for parts of your ZooKeeper node hierarchy in a manner
+similar to database migration utilities.
+
+h2. Background and Usage
+
+Note: To use Migrations, you should be familiar with Java 8's lambdas, CompletedFuture and CompletionStage.
+
+A "migration" is a set of operations to be performed in a transaction. A "migration set" is a list
+of migrations. Combined, this can be used to ensure an initial state for your ZooKeeper nodes as
+well as supporting upgrading/modifying existing state.
+
+For example, given a brand new ZooKeeper instance you might want to populate a few nodes
+and data. E.g.
+
+{code}
+CuratorOp op1 = client.transactionOp().create().forPath("/parent");
+CuratorOp op2 = client.transactionOp().create().forPath("/parent/one");
+CuratorOp op3 = client.transactionOp().create().forPath("/parent/two");
+CuratorOp op4 = client.transactionOp().create().forPath("/parent/three");
+CuratorOp op5 = client.transactionOp().create().forPath("/main", someData);
+{code}
+
+All 5 of these operations would be combined into a migration and set:
+
+{code}
+Migration migration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+MigrationSet set = MigrationSet.build("main", Collections.singletonList(migration));
+{code}
+
+This set can then be passed to a {{MigrationManager}} for processing. The MigrationManager
+checks to see if the migration has been applied already and, if not, processes the transaction.
+
+At a future date, the migration set could be expanded to update/modify things. E.g.
+
+{code}
+CuratorOp newOp1 = client.transactionOp().create().forPath("/new");
+CuratorOp newOp2 = client.transactionOp().delete().forPath("/main"); // maybe this is no longer needed
+{code}
+
+This would be combined with the previous migration:
+
+{code}
+Migration initialMigration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+Migration newMigration = () -> Arrays.asList(newOp1, newOp2);
+MigrationSet set = MigrationSet.build("main", Arrays.asList(initialMigration, newMigration));
+{code}
+
+When this set is run, the MigrationManager will perform both migration operations on new
+ZooKeeper databases but only the second "newMigration" on ZK databases that already have
+the first migration applied.
+
+h2. Details/Reference
+
+_Migration_
+
+A Migration is a wrapper around a list of operations that constitute one stage in a migration
+set and are applied as a single transaction.
+
+_MigrationSet_
+
+A MigrationSet is an ordered list of Migrations. Curator keeps track of which migrations in a
+set have been previously applied and only processes un\-applied migrations. Each migration
+set must have a unique identifier. Create a MigrationSet via its builder:
+
+{code}
+MigrationSet set = MigrationSet.build(migrationId, migrations);
+{code}
+
+_MigrationManager_
+
+The MigrationManager processes MigrationSets. Usually, you'd run this only on new ZooKeeper
+databases or as part of a maintenance operation to update the ZooKeeper database. E.g.
+
+{code}
+MigrationManager manager = new MigrationManager(client,
+ lockPath, // base path for locks used by the manager
+ metaDataPath, // base path to store the meta data
+ executor, // the executor to use
+ lockMax // max time to wait for locks
+);
+manager.migrate(set).exceptionally(e -> {
+ if ( e instanceof MigrationException ) {
+ // migration checksum failed, etc.
+ } else {
+ // some other kind of error
+ }
+ return null;
+});
+{code}
+
+Each migration in the set is applied in a transaction. MigrationManager stores a hash
+of all operations in a migration so that it can be compared for future operations. i.e.
+if, in the future, a migration set is attempted but the hash of one of the previous migrations
+does not match, the stage completes exceptionally with {{MigrationException}}.
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/site.xml
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/site.xml b/curator-x-async/src/site/site.xml
index f78abc7..fc0a67a 100644
--- a/curator-x-async/src/site/site.xml
+++ b/curator-x-async/src/site/site.xml
@@ -25,7 +25,7 @@
<link rel="stylesheet" href="../css/site.css" />
<script type="text/javascript">
$(function(){
- if ( location && location.pathname && location.pathname.endsWith('/index.html') ) {
+ if ( location && location.pathname && (location.pathname.endsWith('/index.html') || location.pathname.endsWith('/migrations.html')) ) {
$('a[title="Java 8/Async"]').parent().addClass("active");
} else {
$('a[title="Strongly Typed Models"]').parent().addClass("active");
http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
index 42bc76d..637aca1 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -109,7 +109,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
executor = Executors.newCachedThreadPool();
- manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
+ manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10));
}
@AfterMethod
@@ -127,7 +127,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Migration m1 = () -> Arrays.asList(v1opA, v1opB);
Migration m2 = () -> Collections.singletonList(v2op);
Migration m3 = () -> Collections.singletonList(v3op);
- MigrationSet migrationSet = MigrationSet.build("1", "/metadata", Arrays.asList(m1, m2, m3));
+ MigrationSet migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));
@@ -143,14 +143,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests
public void testStaged() throws Exception
{
Migration m1 = () -> Arrays.asList(v1opA, v1opB);
- MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", Collections.singletonList(m1));
+ MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(m1));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
Migration m2 = () -> Collections.singletonList(v2op);
- migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2));
+ migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec);
@@ -160,7 +160,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
});
Migration m3 = () -> Collections.singletonList(v3op);
- migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2, m3));
+ migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec);
@@ -170,4 +170,30 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertEquals(m.getLastName(), "Two");
});
}
+
+ @Test
+ public void testDocExample() throws Exception
+ {
+ CuratorOp op1 = client.transactionOp().create().forPath("/parent");
+ CuratorOp op2 = client.transactionOp().create().forPath("/parent/one");
+ CuratorOp op3 = client.transactionOp().create().forPath("/parent/two");
+ CuratorOp op4 = client.transactionOp().create().forPath("/parent/three");
+ CuratorOp op5 = client.transactionOp().create().forPath("/main", "hey".getBytes());
+
+ Migration initialMigration = () -> Arrays.asList(op1, op2, op3, op4, op5);
+ MigrationSet migrationSet = MigrationSet.build("main", Collections.singletonList(initialMigration));
+ complete(manager.migrate(migrationSet));
+
+ Assert.assertNotNull(client.unwrap().checkExists().forPath("/parent/three"));
+ Assert.assertEquals(client.unwrap().getData().forPath("/main"), "hey".getBytes());
+
+ CuratorOp newOp1 = client.transactionOp().create().forPath("/new");
+ CuratorOp newOp2 = client.transactionOp().delete().forPath("/main"); // maybe this is no longer needed
+
+ Migration newMigration = () -> Arrays.asList(newOp1, newOp2);
+ migrationSet = MigrationSet.build("main", Arrays.asList(initialMigration, newMigration));
+ complete(manager.migrate(migrationSet));
+
+ Assert.assertNull(client.unwrap().checkExists().forPath("/main"));
+ }
}