You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/07/18 13:16:31 UTC
[09/23] curator git commit: refactoring and simplification. No need
for ids and versions in Migrations/MetaData. A hash can be auto-generated.
refactoring and simplification. No need for ids and versions in Migrations/MetaData. A hash can be auto-generated.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c8df9a41
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c8df9a41
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c8df9a41
Branch: refs/heads/master
Commit: c8df9a414b9a035f45946460ff7e1adff7fd65d4
Parents: 4f12abc
Author: randgalt <ra...@apache.org>
Authored: Fri Jul 14 12:00:27 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Jul 14 12:00:27 2017 -0500
----------------------------------------------------------------------
.../imps/CuratorMultiTransactionRecord.java | 11 +++
.../framework/imps/ExtractingCuratorOp.java | 8 +-
.../x/async/modeled/migrations/MetaData.java | 74 +-----------------
.../x/async/modeled/migrations/Migration.java | 49 ++----------
.../modeled/migrations/MigrationManager.java | 81 +++++++++++++-------
.../src/site/confluence/index.confluence | 1 +
.../migrations/TestMigrationManager.java | 14 ++--
7 files changed, 87 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
index 0611df6..3e72609 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java
@@ -23,6 +23,7 @@ import org.apache.curator.framework.api.transaction.OperationType;
import org.apache.curator.framework.api.transaction.TypeAndPath;
import org.apache.zookeeper.MultiTransactionRecord;
import org.apache.zookeeper.Op;
+import java.security.MessageDigest;
import java.util.List;
class CuratorMultiTransactionRecord extends MultiTransactionRecord
@@ -50,4 +51,14 @@ class CuratorMultiTransactionRecord extends MultiTransactionRecord
{
return metadata.size();
}
+
+ void addToDigest(MessageDigest digest)
+ {
+ for ( Op op : this )
+ {
+ digest.update(op.getPath().getBytes());
+ digest.update(Integer.toString(op.getType()).getBytes());
+ digest.update(op.toRequestRecord().toString().getBytes());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
index 7a5db69..58a1572 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java
@@ -22,8 +22,9 @@ import com.google.common.base.Preconditions;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.TypeAndPath;
import org.apache.zookeeper.Op;
+import java.security.MessageDigest;
-class ExtractingCuratorOp implements CuratorOp
+public class ExtractingCuratorOp implements CuratorOp
{
private final CuratorMultiTransactionRecord record = new CuratorMultiTransactionRecord();
@@ -46,6 +47,11 @@ class ExtractingCuratorOp implements CuratorOp
return record.iterator().next();
}
+ public void addToDigest(MessageDigest digest)
+ {
+
+ }
+
private void validate()
{
Preconditions.checkArgument(record.size() > 0, "No operation has been added");
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
index da40a5b..8377967 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
@@ -18,76 +18,8 @@
*/
package org.apache.curator.x.async.modeled.migrations;
-import java.util.Objects;
-
-/**
- * The meta data of a single migration
- */
-public class MetaData
+@FunctionalInterface
+public interface MetaData
{
- private final String migrationId;
- private final int migrationVersion;
-
- public MetaData()
- {
- this("", 0);
- }
-
- public MetaData(String migrationId, int migrationVersion)
- {
- this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null");
- this.migrationVersion = migrationVersion;
- }
-
- /**
- * @return The ID of the migration that was applied
- */
- public String getMigrationId()
- {
- return migrationId;
- }
-
- /**
- * @return the version of the migration that was applied
- */
- public int getMigrationVersion()
- {
- return migrationVersion;
- }
-
- @Override
- public boolean equals(Object o)
- {
- if ( this == o )
- {
- return true;
- }
- if ( o == null || getClass() != o.getClass() )
- {
- return false;
- }
-
- MetaData metaData = (MetaData)o;
-
- //noinspection SimplifiableIfStatement
- if ( migrationVersion != metaData.migrationVersion )
- {
- return false;
- }
- return migrationId.equals(metaData.migrationId);
- }
-
- @Override
- public int hashCode()
- {
- int result = migrationId.hashCode();
- result = 31 * result + migrationVersion;
- return result;
- }
-
- @Override
- public String toString()
- {
- return "MetaData{" + "migrationId='" + migrationId + '\'' + ", migrationVersion=" + migrationVersion + '}';
- }
+ byte[] operationHash();
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
index b456580..972c59e 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
@@ -20,57 +20,18 @@ package org.apache.curator.x.async.modeled.migrations;
import org.apache.curator.framework.api.transaction.CuratorOp;
import java.util.List;
-import java.util.Objects;
-import java.util.function.Supplier;
/**
* Models a single migration/transition
*/
+@FunctionalInterface
public interface Migration
{
/**
- * @return the unique ID for this migration
- */
- String id();
-
- /**
- * @return the version of this migration
- */
- int version();
-
- /**
- * @return the operations to execute in a transaction
+ * Return the operations to execute in a transaction. IMPORTANT: during a migration
+ * this method may be called multiple times.
+ *
+ * @return operations
*/
List<CuratorOp> operations();
-
- static Migration build(String id, Supplier<List<CuratorOp>> operationsProc)
- {
- return build(id, 1, operationsProc);
- }
-
- static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc)
- {
- Objects.requireNonNull(id, "id cannot be null");
- Objects.requireNonNull(operationsProc, "operationsProc cannot be null");
- return new Migration()
- {
- @Override
- public String id()
- {
- return id;
- }
-
- @Override
- public int version()
- {
- return version;
- }
-
- @Override
- public List<CuratorOp> operations()
- {
- return operationsProc.get();
- }
- };
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
index 01de2f8..d6d37de 100644
--- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
@@ -20,6 +20,7 @@ package org.apache.curator.x.async.modeled.migrations;
import com.google.common.base.Throwables;
import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.imps.ExtractingCuratorOp;
import org.apache.curator.framework.recipes.locks.InterProcessLock;
import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
import org.apache.curator.x.async.AsyncCuratorFramework;
@@ -29,8 +30,11 @@ import org.apache.curator.x.async.modeled.ModeledFramework;
import org.apache.curator.x.async.modeled.ZNode;
import org.apache.curator.x.async.modeled.ZPath;
import org.apache.zookeeper.CreateMode;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
@@ -49,27 +53,21 @@ public class MigrationManager
{
private final AsyncCuratorFramework client;
private final ZPath lockPath;
- private final ModelSerializer<MetaData> metaDataSerializer;
private final Executor executor;
private final Duration lockMax;
private static final String META_DATA_NODE_NAME = "meta-";
/**
- * Jackson usage: See the note in {@link org.apache.curator.x.async.modeled.JacksonModelSerializer} regarding how the Jackson library is specified in Curator's Maven file.
- * Unless you are not using Jackson pass <code>JacksonModelSerializer.build(MetaData.class)</code> for <code>metaDataSerializer</code>
- *
* @param client the curator client
* @param lockPath base path for locks used by the manager
- * @param metaDataSerializer JacksonModelSerializer.build(MetaData.class)
* @param executor the executor to use
* @param lockMax max time to wait for locks
*/
- public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax)
+ public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, Executor executor, Duration lockMax)
{
this.client = Objects.requireNonNull(client, "client cannot be null");
this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null");
- this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null");
this.executor = Objects.requireNonNull(executor, "executor cannot be null");
this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null");
}
@@ -90,18 +88,6 @@ public class MigrationManager
}
/**
- * Utility to return the meta data from previous migrations
- *
- * @param set the set
- * @return stage
- */
- public CompletionStage<List<MetaData>> metaData(MigrationSet set)
- {
- ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
- return ZNode.models(modeled.childrenAsZNodes());
- }
-
- /**
* Can be overridden to change how the comparison to previous migrations is done. The default
* version ensures that the meta data from previous migrations matches the current migration
* set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown.
@@ -115,21 +101,46 @@ public class MigrationManager
{
if ( sortedMetaData.size() > set.migrations().size() )
{
- throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
+ throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id()));
}
int compareSize = Math.min(set.migrations().size(), sortedMetaData.size());
- List<MetaData> compareMigrations = set.migrations().subList(0, compareSize)
- .stream()
- .map(m -> new MetaData(m.id(), m.version()))
- .collect(Collectors.toList());
- if ( !compareMigrations.equals(sortedMetaData) )
+ List<Migration> subList = set.migrations().subList(0, compareSize);
+ for ( int i = 0; i < compareSize; ++i )
{
- throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData));
+ byte[] setHash = hash(set.migrations().get(i).operations()).operationHash();
+ if ( !Arrays.equals(setHash, sortedMetaData.get(i).operationHash()) )
+ {
+ throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id()));
+ }
}
return set.migrations().subList(sortedMetaData.size(), set.migrations().size());
}
+ private MetaData hash(List<CuratorOp> operations)
+ {
+ MessageDigest digest;
+ try
+ {
+ digest = MessageDigest.getInstance("SHA-256");
+ }
+ catch ( NoSuchAlgorithmException e )
+ {
+ throw new RuntimeException(e);
+ }
+ operations.forEach(op -> {
+ if ( op instanceof ExtractingCuratorOp )
+ {
+ ((ExtractingCuratorOp)op).addToDigest(digest);
+ }
+ else
+ {
+ digest.update(op.toString().getBytes());
+ }
+ });
+ return digest::digest;
+ }
+
private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set)
{
ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath());
@@ -148,7 +159,21 @@ public class MigrationManager
private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
{
- ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
+ ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>()
+ {
+ @Override
+ public byte[] serialize(MetaData model)
+ {
+ return model.operationHash();
+ }
+
+ @Override
+ public MetaData deserialize(byte[] bytes)
+ {
+ return () -> bytes;
+ }
+ };
+ ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
return ModeledFramework.wrap(client, modelSpec);
}
@@ -186,7 +211,7 @@ public class MigrationManager
List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> {
List<CuratorOp> operations = new ArrayList<>();
operations.addAll(migration.operations());
- MetaData thisMetaData = new MetaData(migration.id(), migration.version());
+ MetaData thisMetaData = hash(operations);
operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture();
}).collect(Collectors.toList());
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/site/confluence/index.confluence
----------------------------------------------------------------------
diff --git a/curator-x-async/src/site/confluence/index.confluence b/curator-x-async/src/site/confluence/index.confluence
index 4d81d44..74a47b4 100644
--- a/curator-x-async/src/site/confluence/index.confluence
+++ b/curator-x-async/src/site/confluence/index.confluence
@@ -40,6 +40,7 @@ This is a strongly typed DSL that allows you to map a Curator\-style client to:
* Options for how nodes should be created (sequential, compressed data, ttl, etc.)
* ACLs for the nodes at the path
* Options for how to delete nodes (guaranteed, deleting children, etc.)
+* Perform ZooKeeper data migration
For example:
http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
index 3fe5de2..45aa130 100644
--- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
@@ -109,7 +109,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30));
executor = Executors.newCachedThreadPool();
- manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10));
+ manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10));
}
@AfterMethod
@@ -124,9 +124,9 @@ public class TestMigrationManager extends CompletableBaseClassForTests
@Test
public void testBasic() throws Exception
{
- Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
- Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
- Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+ Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+ Migration m2 = () -> Collections.singletonList(v2op);
+ Migration m3 = () -> Collections.singletonList(v3op);
MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));
@@ -142,14 +142,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests
@Test
public void testStaged() throws Exception
{
- Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB));
+ Migration m1 = () -> Arrays.asList(v1opA, v1opB);
MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1));
complete(manager.migrate(migrationSet));
ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec);
complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test"));
- Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op));
+ Migration m2 = () -> Collections.singletonList(v2op);
migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2));
complete(manager.migrate(migrationSet));
@@ -159,7 +159,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests
Assert.assertEquals(m.getAge(), 10);
});
- Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op));
+ Migration m3 = () -> Collections.singletonList(v3op);
migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
complete(manager.migrate(migrationSet));