You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/04 22:00:10 UTC
[ignite-3] 01/01: IGNITE-14389 Implemented conditional update
(invoke). Added ExistenceCondition.
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 4c97c234522e6cd80cd2b4dbca2570b05d05c6fc
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Wed May 5 00:59:37 2021 +0300
IGNITE-14389 Implemented conditional update (invoke). Added ExistenceCondition.
---
.../ignite/internal/affinity/AffinityManager.java | 17 +-
.../metastorage/client/MetaStorageService.java | 32 +-
.../ignite/metastorage/common/Condition.java | 207 ++++----
.../ignite/metastorage/common/Conditions.java | 37 +-
.../ignite/metastorage/common/Operation.java | 45 +-
.../ignite/metastorage/common/Operations.java | 6 +-
modules/metastorage-server/pom.xml | 6 +-
.../metastorage/server/AbstractCondition.java | 13 +
.../internal/metastorage/server/Condition.java | 7 +
.../metastorage/server/ExistenceCondition.java | 44 ++
.../metastorage/server/KeyValueStorage.java | 2 +
.../internal/metastorage/server/Operation.java | 31 ++
.../metastorage/server/RevisionCondition.java | 76 +++
.../server/SimpleInMemoryKeyValueStorage.java | 95 +++-
.../metastorage/server/ValueCondition.java | 49 ++
.../metastorage/server/ExistenceConditionTest.java | 37 ++
.../metastorage/server/RevisionConditionTest.java | 65 +++
.../server/SimpleInMemoryKeyValueStorageTest.java | 547 +++++++++++++++++++++
.../metastorage/server/ValueConditionTest.java | 27 +
.../internal/metastorage/MetaStorageManager.java | 23 +-
.../apache/ignite/internal/app/IgnitionImpl.java | 56 ++-
.../internal/table/distributed/TableManager.java | 19 +-
22 files changed, 1212 insertions(+), 229 deletions(-)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 37690bf..e658ca1 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -37,6 +37,7 @@ import org.apache.ignite.metastorage.common.Operations;
import org.apache.ignite.metastorage.common.WatchEvent;
import org.apache.ignite.metastorage.common.WatchListener;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Affinity manager is responsible for affinity function related logic including calculating affinity assignments.
@@ -143,16 +144,20 @@ public class AffinityManager {
UUID tblId = UUID.fromString(placeholderValue);
try {
- String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
+ byte[] tblNameVal = vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).get().value();
- int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
- .tables().get(name).partitions().value();
- int replicas = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
- .tables().get(name).replicas().value();
+ String name = new String(tblNameVal, StandardCharsets.UTF_8);
+
+ int partitions = configurationMgr.configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY).tables().get(name).partitions().value();
+
+ int replicas = configurationMgr.configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY).tables().get(name).replicas().value();
var key = evt.newEntry().key();
+
metaStorageMgr.invoke(
- Conditions.key(key).value().eq(assignmentVal),
+ Conditions.value(key).eq(assignmentVal),
Operations.put(key, ByteUtils.toBytes(
RendezvousAffinityFunction.assignPartitions(
baselineMgr.nodes(),
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
index 6c24a59..409f90d 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
@@ -203,9 +203,9 @@ public interface MetaStorageService {
* <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
*
* @param condition The condition.
- * @param success Batch of updates which will be atomically applied in case of condition evaluation yields {@code true}.
- * @param failure Batch of updates which will be atomically applied in case of condition evaluation yields {@code false}.
- * @return Future result {@code true} if {@code success} updates were applied, otherwise {@code false}.
+ * @param success The update which will be applied in case of condition evaluation yields {@code true}.
+ * @param failure The update which will be applied in case of condition evaluation yields {@code false}.
+ * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}.
* @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
* @see Key
* @see Entry
@@ -215,14 +215,34 @@ public interface MetaStorageService {
// TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
@NotNull
CompletableFuture<Boolean> invoke(@NotNull Condition condition,
- @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure);
+ @NotNull Operation success, @NotNull Operation failure);
+
+ /**
+ * Updates an entry for the given key conditionally.
+ *
+ * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
+ *
+ * @param condition The condition.
+ * @param success The updates which will be applied in case of condition evaluation yields {@code true}.
+ * @param failure The updates which will be applied in case of condition evaluation yields {@code false}.
+ * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Key
+ * @see Entry
+ * @see Condition
+ * @see Operation
+ */
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
+ @NotNull
+ CompletableFuture<Boolean> invoke(@NotNull Condition condition,
+ @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure);
+
/**
* Updates an entry for the given key conditionally.
*
* <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
*
- * @param key The key. Couldn't be {@code null}.
* @param condition The condition.
* @param success The update which will be applied in case of condition evaluation yields {@code true}.
* @param failure The update which will be applied in case of condition evaluation yields {@code false}.
@@ -235,7 +255,7 @@ public interface MetaStorageService {
*/
//TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
@NotNull
- CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition,
+ CompletableFuture<Entry> getAndInvoke(@NotNull Condition condition,
@NotNull Operation success, @NotNull Operation failure);
/**
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
index 54049de..abd2edf 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
@@ -17,8 +17,6 @@
package org.apache.ignite.metastorage.common;
-import java.util.Arrays;
-
/**
* Represents a condition for conditional update.
*/
@@ -36,22 +34,11 @@ public final class Condition {
}
/**
- * Tests the given entry on satisfaction of the condition.
- *
- * @param e Entry.
- * @return The result of condition test. {@code true} - if the entry satisfies to the condition,
- * otherwise - {@code false}.
- */
- public boolean test(Entry e) {
- return cond.test(e);
- }
-
- /**
* Represents condition on entry revision. Only one type of condition could be applied to
* the one instance of condition. Subsequent invocations of any method which produces condition will throw
* {@link IllegalStateException}.
*/
- public static final class RevisionCondition implements InnerCondition {
+ public static final class RevisionCondition extends AbstractCondition {
/**
* The type of condition.
*
@@ -62,16 +49,13 @@ public final class Condition {
/** The revision as the condition argument. */
private long rev;
- /** Key of entry, which will be tested for condition. */
- private final Key key;
-
/**
- * Creates a new condition for the given {@code key}.
+ * Constructs a condition by a revision for an entry identified by the given key.
*
- * @param key Key of entry, to be tested for the condition.
+ * @param key Identifies an entry which condition will be applied to.
*/
- RevisionCondition(Key key) {
- this.key = key;
+ RevisionCondition(byte[] key) {
+ super(key);
}
/**
@@ -176,70 +160,27 @@ public final class Condition {
return new Condition(this);
}
- /** {@inheritDoc} */
- @Override public boolean test(Entry e) {
- if ((e.key() == key) || (e.key() != null && e.key().equals(key))) {
- int res = Long.compare(e.revision(), rev);
-
- return type.test(res);
- }
- else
- return false;
- }
-
/**
* Defines possible condition types which can be applied to the revision.
*/
enum Type {
/** Equality condition type. */
- EQUAL {
- @Override public boolean test(long res) {
- return res == 0;
- }
- },
+ EQUAL,
/** Inequality condition type. */
- NOT_EQUAL {
- @Override public boolean test(long res) {
- return res != 0;
- }
- },
+ NOT_EQUAL,
/** Greater than condition type. */
- GREATER {
- @Override public boolean test(long res) {
- return res > 0;
- }
- },
+ GREATER,
/** Less than condition type. */
- LESS {
- @Override public boolean test(long res) {
- return res < 0;
- }
- },
+ LESS,
/** Less than or equal to condition type. */
- LESS_OR_EQUAL {
- @Override public boolean test(long res) {
- return res <= 0;
- }
- },
+ LESS_OR_EQUAL,
/** Greater than or equal to condition type. */
- GREATER_OR_EQUAL {
- @Override public boolean test(long res) {
- return res >= 0;
- }
- };
-
- /**
- * Interprets comparison result.
- *
- * @param res The result of comparison.
- * @return The interpretation of the comparison result.
- */
- public abstract boolean test(long res);
+ GREATER_OR_EQUAL
}
}
@@ -248,7 +189,7 @@ public final class Condition {
* the one instance of condition. Subsequent invocations of any method which produces condition will throw
* {@link IllegalStateException}.
*/
- public static final class ValueCondition implements InnerCondition {
+ public static final class ValueCondition extends AbstractCondition {
/**
* The type of condition.
*
@@ -259,16 +200,13 @@ public final class Condition {
/** The value as the condition argument. */
private byte[] val;
- /** Key of entry, which will be tested for condition. */
- private final Key key;
-
/**
- * Creates a new condition for the given {@code key}.
+ * Constructs a condition by a value for an entry identified by the given key.
*
- * @param key Key of entry, to be tested for the condition.
+ * @param key Identifies an entry which condition will be applied to.
*/
- ValueCondition(Key key) {
- this.key = key;
+ ValueCondition(byte[] key) {
+ super(key);
}
/**
@@ -305,15 +243,69 @@ public final class Condition {
return new Condition(this);
}
- /** {@inheritDoc} */
- @Override public boolean test(Entry e) {
- if ((e.key() == key) || (e.key() != null && e.key().equals(key))) {
- int res = Arrays.compare(e.value(), val);
+ /**
+ * Defines possible condition types which can be applied to the value.
+ */
+ enum Type {
+ /** Equality condition type. */
+ EQUAL,
- return type.test(res);
- }
- else
- return false;
+ /** Inequality condition type. */
+ NOT_EQUAL
+ }
+ }
+
+ //TODO: Javadoc
+ /**
+ * Represents condition on entry value. Only one type of condition could be applied to
+ * the one instance of condition. Subsequent invocations of any method which produces condition will throw
+ * {@link IllegalStateException}.
+ */
+ public static final class ExistenceCondition extends AbstractCondition {
+ /**
+ * The type of condition.
+ *
+ * @see Type
+ */
+ private Type type;
+
+ /**
+ * Constructs a condition by a value for an entry identified by the given key.
+ *
+ * @param key Identifies an entry which condition will be applied to.
+ */
+ ExistenceCondition(byte[] key) {
+ super(key);
+ }
+
+ /**
+ * Produces the condition of type {@link Type#EXISTS}. This condition tests the given value on equality with
+ * target entry value.
+ *
+ * @return The condition of type {@link Type#EXISTS}.
+ * @throws IllegalStateException In case when the condition is already defined.
+ */
+ public Condition exists() {
+ validate(type);
+
+ this.type = Type.EXISTS;
+
+ return new Condition(this);
+ }
+
+ /**
+ * Produces the condition of type {@link Type#NOT_EXISTS}. This condition tests the given value on inequality
+ * with target entry value.
+ *
+ * @return The condition of type {@link Type#NOT_EXISTS}.
+ * @throws IllegalStateException In case when the condition is already defined.
+ */
+ public Condition notExists() {
+ validate(type);
+
+ this.type = Type.NOT_EXISTS;
+
+ return new Condition(this);
}
/**
@@ -321,29 +313,14 @@ public final class Condition {
*/
enum Type {
/** Equality condition type. */
- EQUAL {
- @Override public boolean test(long res) {
- return res == 0;
- }
- },
+ EXISTS,
/** Inequality condition type. */
- NOT_EQUAL {
- @Override public boolean test(long res) {
- return res != 0;
- }
- };
-
- /**
- * Interprets comparison result.
- *
- * @param res The result of comparison.
- * @return The interpretation of the comparison result.
- */
- public abstract boolean test(long res);
+ NOT_EXISTS
}
}
+
/**
* Checks that condition is not defined yet. If the condition is already defined then exception will be thrown.
*
@@ -357,14 +334,24 @@ public final class Condition {
/**
* Defines condition interface.
*/
- private interface InnerCondition {
+ public interface InnerCondition {
/**
- * Tests the given entry on satisfaction of the condition.
+ * Returns key which identifies an entry which condition will be applied to.
*
- * @param e Entry.
- * @return The result of condition test. {@code true} - if the entry satisfies to the condition,
- * otherwise - {@code false}.
+ * @return Key which identifies an entry which condition will be applied to.
*/
- boolean test(Entry e);
+ byte[] key();
+ }
+
+ private static abstract class AbstractCondition implements InnerCondition {
+ private final byte[] key;
+
+ public AbstractCondition(byte[] key) {
+ this.key = key;
+ }
+
+ @Override public byte[] key() {
+ return key;
+ }
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
index 87fa238..2f4d537 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
@@ -24,47 +24,34 @@ package org.apache.ignite.metastorage.common;
* @see Condition
*/
public final class Conditions {
-
- /** Key. */
- private Key key;
-
- /**
- * Creates new condition for entry with concrete key.
- *
- * @param key Key
- */
- private Conditions(Key key) {
- this.key = key;
- }
-
/**
* Creates condition on entry revision.
*
+ * @param key Identifies an entry which condition will be applied to.
* @return Condition on entry revision.
* @see Condition.RevisionCondition
*/
- public Condition.RevisionCondition revision() {
- return new Condition.RevisionCondition(key);
+ public static Condition.RevisionCondition revision(Key key) {
+ return new Condition.RevisionCondition(key.bytes());
}
/**
* Creates condition on entry value.
*
+ * @param key Identifies an entry which condition will be applied to.
* @return Condition on entry value.
* @see Condition.ValueCondition
*/
- public Condition.ValueCondition value() {
- return new Condition.ValueCondition(key);
+ public static Condition.ValueCondition value(Key key) {
+ return new Condition.ValueCondition(key.bytes());
}
- /**
- * Creates key-based condition.
- *
- * @param key Key of condition.
- * @return Key-based condition instance.
- */
- public static Conditions key(Key key) {
- return new Conditions(key);
+ public static Condition exists(Key key) {
+ return new Condition.ExistenceCondition(key.bytes()).exists();
+ }
+
+ public static Condition notExists(Key key) {
+ return new Condition.ExistenceCondition(key.bytes()).notExists();
}
/**
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
index 9810fe5..fd5428a 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
@@ -17,6 +17,8 @@
package org.apache.ignite.metastorage.common;
+import org.jetbrains.annotations.Nullable;
+
/**
* Defines operation for meta storage conditional update (invoke).
*/
@@ -37,37 +39,33 @@ public final class Operation {
/**
* Represents operation of type <i>remove</i>.
*/
- public static final class RemoveOp implements InnerOp {
- /** Key. */
- private final Key key;
-
+ public static final class RemoveOp extends AbstractOp {
/**
- * Creates a new remove operation for the given {@code key}.
+ * Default no-op constructor.
*
- * @param key Key.
+ * @param key Identifies an entry which operation will be applied to.
*/
- RemoveOp(Key key) {
- this.key = key;
+ RemoveOp(byte[] key) {
+ super(key);
}
}
/**
* Represents operation of type <i>put</i>.
*/
- public static final class PutOp implements InnerOp {
- /** Key. */
- private final Key key;
-
+ public static final class PutOp extends AbstractOp {
/** Value. */
private final byte[] val;
/**
* Constructs operation of type <i>put</i>.
*
+ * @param key Identifies an entry which operation will be applied to.
* @param val The value to which the entry should be updated.
*/
- PutOp(Key key, byte[] val) {
- this.key = key;
+ PutOp(byte[] key, byte[] val) {
+ super(key);
+
this.val = val;
}
}
@@ -75,12 +73,12 @@ public final class Operation {
/**
* Represents operation of type <i>no-op</i>.
*/
- public static final class NoOp implements InnerOp {
+ public static final class NoOp extends AbstractOp {
/**
* Default no-op constructor.
*/
NoOp() {
- // No-op.
+ super(null);
}
}
@@ -88,6 +86,19 @@ public final class Operation {
* Defines operation interface.
*/
private interface InnerOp {
- // Marker interface.
+ @Nullable byte[] key();
+ }
+
+ private static class AbstractOp implements InnerOp {
+ @Nullable private final byte[] key;
+
+ public AbstractOp(@Nullable byte[] key) {
+ this.key = key;
+ }
+
+ @Nullable
+ @Override public byte[] key() {
+ return key;
+ }
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
index 31c7449..e51fa9e 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
@@ -30,20 +30,22 @@ public final class Operations {
/**
* Creates operation of type <i>remove</i>. This type of operation removes entry.
*
+ * @param key Identifies an entry which operation will be applied to.
* @return Operation of type <i>remove</i>.
*/
public static Operation remove(Key key) {
- return new Operation(new Operation.RemoveOp(key));
+ return new Operation(new Operation.RemoveOp(key.bytes()));
}
/**
* Creates operation of type <i>put</i>. This type of operation inserts or updates value of entry.
*
+ * @param key Identifies an entry which operation will be applied to.
* @param value Value.
* @return Operation of type <i>put</i>.
*/
public static Operation put(Key key, byte[] value) {
- return new Operation(new Operation.PutOp(key, value));
+ return new Operation(new Operation.PutOp(key.bytes(), value));
}
/**
diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml
index d73d080..5f0f453 100644
--- a/modules/metastorage-server/pom.xml
+++ b/modules/metastorage-server/pom.xml
@@ -29,20 +29,18 @@
<relativePath>../../parent/pom.xml</relativePath>
</parent>
- <artifactId>metastorage-server</artifactId>
+ <artifactId>ignite-metastorage-server</artifactId>
<version>3.0.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
- <version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>metastorage-common</artifactId>
- <version>${project.version}</version>
+ <artifactId>ignite-metastorage-common</artifactId>
</dependency>
<dependency>
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java
new file mode 100644
index 0000000..55ec9e6
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java
@@ -0,0 +1,13 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public abstract class AbstractCondition implements Condition {
+ private final byte[] key;
+
+ public AbstractCondition(byte[] key) {
+ this.key = key;
+ }
+
+ @Override public byte[] key() {
+ return key;
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
new file mode 100644
index 0000000..ea4fcc7
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java
@@ -0,0 +1,7 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public interface Condition {
+ byte[] key();
+
+ boolean test(Entry e);
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ExistenceCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ExistenceCondition.java
new file mode 100644
index 0000000..13cd96c
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ExistenceCondition.java
@@ -0,0 +1,44 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public class ExistenceCondition extends AbstractCondition {
+ private final Type type;
+
+ public ExistenceCondition(Type type, byte[] key) {
+ super(key);
+
+ this.type = type;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Entry e) {
+ boolean res = !(e.empty() || e.tombstone());
+
+ return type.test(res);
+ }
+
+
+ public enum Type {
+ /** Equality condition type. */
+ EXISTS {
+ @Override public boolean test(boolean res) {
+ return res;
+ }
+ },
+
+ /** Inequality condition type. */
+ NOT_EXISTS {
+ @Override public boolean test(boolean res) {
+ return !res;
+ }
+ };
+
+ /**
+ * Interprets comparison result.
+ *
+ * @param res The result of comparison.
+ * @return The interpretation of the comparison result.
+ */
+ public abstract boolean test(boolean res);
+ }
+
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index 5d6da44..5659d3e 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -59,6 +59,8 @@ public interface KeyValueStorage {
@NotNull
Collection<Entry> getAndRemoveAll(List<byte[]> keys);
+ boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure);
+
Cursor<Entry> range(byte[] keyFrom, byte[] keyTo);
Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound);
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java
new file mode 100644
index 0000000..2d1fbfd
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java
@@ -0,0 +1,31 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public class Operation {
+ private final byte[] key;
+ private final byte[] val;
+ private final Type type;
+
+ public Operation(Type type, byte[] key, byte[] val) {
+ this.key = key;
+ this.val = val;
+ this.type = type;
+ }
+
+ byte[] key() {
+ return key;
+ }
+
+ byte[] value() {
+ return val;
+ }
+
+ Type type() {
+ return type;
+ }
+
+ enum Type {
+ PUT,
+ REMOVE,
+ NO_OP
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java
new file mode 100644
index 0000000..2f3727b
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java
@@ -0,0 +1,76 @@
+package org.apache.ignite.internal.metastorage.server;
+
+public class RevisionCondition extends AbstractCondition {
+ private final Type type;
+
+ private final long rev;
+
+ public RevisionCondition(Type type, byte[] key, long rev) {
+ super(key);
+
+ this.type = type;
+ this.rev = rev;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Entry e) {
+ int res = Long.compare(e.revision(), rev);
+
+ return type.test(res);
+ }
+
+ /**
+ * Defines possible condition types which can be applied to the revision.
+ */
+ public enum Type {
+ /** Equality condition type. */
+ EQUAL {
+ @Override public boolean test(long res) {
+ return res == 0;
+ }
+ },
+
+ /** Inequality condition type. */
+ NOT_EQUAL {
+ @Override public boolean test(long res) {
+ return res != 0;
+ }
+ },
+
+ /** Greater than condition type. */
+ GREATER {
+ @Override public boolean test(long res) {
+ return res > 0;
+ }
+ },
+
+ /** Less than condition type. */
+ LESS {
+ @Override public boolean test(long res) {
+ return res < 0;
+ }
+ },
+
+ /** Less than or equal to condition type. */
+ LESS_OR_EQUAL {
+ @Override public boolean test(long res) {
+ return res <= 0;
+ }
+ },
+
+ /** Greater than or equal to condition type. */
+ GREATER_OR_EQUAL {
+ @Override public boolean test(long res) {
+ return res >= 0;
+ }
+ };
+
+ /**
+ * Interprets comparison result.
+ *
+ * @param res The result of comparison.
+ * @return The interpretation of the comparison result.
+ */
+ public abstract boolean test(long res);
+ }
+}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index b37c96a..3033f2c 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -29,6 +29,7 @@ import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.Predicate;
+
import org.apache.ignite.metastorage.common.Cursor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.TestOnly;
@@ -63,14 +64,22 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
@Override public void put(byte[] key, byte[] value) {
synchronized (mux) {
- doPut(key, value);
+ long curRev = rev + 1;
+
+ doPut(key, value, curRev);
+
+ rev = curRev;
}
}
@NotNull
@Override public Entry getAndPut(byte[] key, byte[] bytes) {
synchronized (mux) {
- long lastRev = doPut(key, bytes);
+ long curRev = rev + 1;
+
+ long lastRev = doPut(key, bytes, curRev);
+
+ rev = curRev;
// Return previous value.
return doGetValue(key, lastRev);
@@ -129,15 +138,24 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
@Override
public void remove(byte[] key) {
synchronized (mux) {
- Entry e = doGet(key, LATEST_REV, false);
-
- if (e.empty() || e.tombstone())
- return;
+ long curRev = rev + 1;
- doPut(key, TOMBSTONE);
+ if (doRemove(key, curRev))
+ rev = curRev;
}
}
+ private boolean doRemove(byte[] key, long curRev) {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ if (e.empty() || e.tombstone())
+ return false;
+
+ doPut(key, TOMBSTONE, curRev);
+
+ return true;
+ }
+
@NotNull
@Override public Entry getAndRemove(byte[] key) {
synchronized (mux) {
@@ -204,6 +222,47 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return res;
}
+ @Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
+ synchronized (mux) {
+ Entry e = get(condition.key());
+
+ boolean branch = condition.test(e);
+
+ Collection<Operation> ops = branch ? success : failure;
+
+ long curRev = rev + 1;
+
+ boolean modified = false;
+
+ for (Operation op : ops) {
+ switch (op.type()) {
+ case PUT:
+ doPut(op.key(), op.value(), curRev);
+
+ modified = true;
+
+ break;
+
+ case REMOVE:
+ modified |= doRemove(op.key(), curRev);
+
+ break;
+
+ case NO_OP:
+ break;
+
+ default:
+ throw new IllegalArgumentException("Unknown operation type: " + op.type());
+ }
+ }
+
+ if (modified)
+ rev = curRev;
+
+ return branch;
+ }
+ }
+
@Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
return new RangeCursor(keyFrom, keyTo, rev);
}
@@ -365,9 +424,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return new Entry(key, lastVal.bytes() , lastRev, lastVal.updateCounter());
}
- private long doPut(byte[] key, byte[] bytes) {
- long curRev = ++rev;
-
+ private long doPut(byte[] key, byte[] bytes, long curRev) {
long curUpdCntr = ++updCntr;
// Update keysIdx.
@@ -378,13 +435,25 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
revs.add(curRev);
// Update revsIdx.
- NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
+ //NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
Value val = new Value(bytes, curUpdCntr);
- entries.put(key, val);
+ //entries.put(key, val);
- revsIdx.put(curRev, entries);
+ //revsIdx.put(curRev, entries);
+
+ revsIdx.compute(
+ curRev,
+ (rev, entries) -> {
+ if (entries == null)
+ entries = new TreeMap<>(CMP);
+
+ entries.put(key, val);
+
+ return entries;
+ }
+ );
return lastRev;
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java
new file mode 100644
index 0000000..dfd9f8e
--- /dev/null
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java
@@ -0,0 +1,49 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import java.util.Arrays;
+
+public class ValueCondition extends AbstractCondition {
+ private final Type type;
+
+ private final byte[] val;
+
+ public ValueCondition(Type type, byte[] key, byte[] val) {
+ super(key);
+ this.type = type;
+ this.val = val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean test(Entry e) {
+ int res = Arrays.compare(e.value(), val);
+
+ return type.test(res);
+ }
+
+ /**
+ * Defines possible condition types which can be applied to the value.
+ */
+ enum Type {
+ /** Equality condition type. */
+ EQUAL {
+ @Override public boolean test(long res) {
+ return res == 0;
+ }
+ },
+
+ /** Inequality condition type. */
+ NOT_EQUAL {
+ @Override public boolean test(long res) {
+ return res != 0;
+ }
+ };
+
+ /**
+ * Interprets comparison result.
+ *
+ * @param res The result of comparison.
+ * @return The interpretation of the comparison result.
+ */
+ public abstract boolean test(long res);
+ }
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ExistenceConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ExistenceConditionTest.java
new file mode 100644
index 0000000..679b3ad
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ExistenceConditionTest.java
@@ -0,0 +1,37 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ExistenceConditionTest {
+ private static final byte[] KEY = new byte[] {1};
+
+ private static final byte[] VAL = new byte[] {11};
+
+ private static final Entry ENTRY = new Entry(KEY, VAL, 1, 1);
+
+ private static final Entry EMPTY = Entry.empty(KEY);
+
+ private static final Entry TOMBSTONE = Entry.tombstone(KEY, 1, 1);
+
+ @Test
+ public void exists() {
+ ExistenceCondition cond = new ExistenceCondition(ExistenceCondition.Type.EXISTS, KEY);
+
+ assertTrue(cond.test(ENTRY));
+ assertFalse(cond.test(EMPTY));
+ assertFalse(cond.test(TOMBSTONE));
+ }
+
+ @Test
+ public void notExists() {
+ ExistenceCondition cond = new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, KEY);
+
+ assertFalse(cond.test(ENTRY));
+ assertTrue(cond.test(EMPTY));
+ assertTrue(cond.test(TOMBSTONE));
+ }
+
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java
new file mode 100644
index 0000000..3a08c13
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java
@@ -0,0 +1,65 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RevisionConditionTest {
+ private static final byte[] key = new byte[] {1};
+
+ private static final byte[] val = new byte[] {2};
+
+ @Test
+ public void eq() {
+ RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.EQUAL, key, 1);
+
+ // 1 == 1.
+ assertTrue(cond.test(new Entry(key, val, 1, 1)));
+ }
+
+ @Test
+ public void ne() {
+ RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.NOT_EQUAL, key, 1);
+
+ // 2 != 1.
+ assertTrue(cond.test(new Entry(key, val, 2, 1)));
+ }
+
+ @Test
+ public void gt() {
+ RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.GREATER, key, 1);
+
+ // 2 > 1.
+ assertTrue(cond.test(new Entry(key, val, 2, 1)));
+ }
+
+ @Test
+ public void ge() {
+ RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.GREATER_OR_EQUAL, key, 1);
+
+ // 2 >= 1 (2 > 1).
+ assertTrue(cond.test(new Entry(key, val, 2, 1)));
+
+ // 1 >= 1 (1 == 1).
+ assertTrue(cond.test(new Entry(key, val, 1, 1)));
+ }
+
+ @Test
+ public void lt() {
+ RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.LESS, key, 2);
+
+ // 1 < 2
+ assertTrue(cond.test(new Entry(key, val, 1, 1)));
+ }
+
+ @Test
+ public void le() {
+ RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.LESS_OR_EQUAL, key, 2);
+
+ // 1 <= 2 (1 < 2)
+ assertTrue(cond.test(new Entry(key, val, 1, 1)));
+
+ // 1 <= 1 (1 == 1).
+ assertTrue(cond.test(new Entry(key, val, 1, 1)));
+ }
+}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 27df790..4829bc4 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -939,6 +939,553 @@ class SimpleInMemoryKeyValueStorageTest {
}
@Test
+ public void invokeWithRevisionCondition_successBranch() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ boolean branch = storage.invoke(
+ new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
+ List.of(
+ new Operation(Operation.Type.PUT, key1, val1_2),
+ new Operation(Operation.Type.PUT, key2, val2)
+ ),
+ List.of(new Operation(Operation.Type.PUT, key3, val3))
+ );
+
+ // "Success" branch is applied.
+ assertTrue(branch);
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e1 = storage.get(key1);
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertEquals(2, e1.revision());
+ assertEquals(2, e1.updateCounter());
+ assertArrayEquals(val1_2, e1.value());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertArrayEquals(val2, e2.value());
+
+ // "Failure" branch isn't applied.
+ Entry e3 = storage.get(key3);
+
+ assertTrue(e3.empty());
+ }
+
+ @Test
+ public void invokeWithRevisionCondition_failureBranch() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ boolean branch = storage.invoke(
+ new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
+ List.of(new Operation(Operation.Type.PUT, key3, val3)),
+ List.of(
+ new Operation(Operation.Type.PUT, key1, val1_2),
+ new Operation(Operation.Type.PUT, key2, val2)
+ )
+ );
+
+ // "Failure" branch is applied.
+ assertFalse(branch);
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e1 = storage.get(key1);
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertEquals(2, e1.revision());
+ assertEquals(2, e1.updateCounter());
+ assertArrayEquals(val1_2, e1.value());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertArrayEquals(val2, e2.value());
+
+ // "Success" branch isn't applied.
+ Entry e3 = storage.get(key3);
+
+ assertTrue(e3.empty());
+ }
+
+ @Test
+ public void invokeWithExistsCondition_successBranch() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ boolean branch = storage.invoke(
+ new ExistenceCondition(ExistenceCondition.Type.EXISTS, key1),
+ List.of(
+ new Operation(Operation.Type.PUT, key1, val1_2),
+ new Operation(Operation.Type.PUT, key2, val2)
+ ),
+ List.of(new Operation(Operation.Type.PUT, key3, val3))
+ );
+
+ // "Success" branch is applied.
+ assertTrue(branch);
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e1 = storage.get(key1);
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertEquals(2, e1.revision());
+ assertEquals(2, e1.updateCounter());
+ assertArrayEquals(val1_2, e1.value());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertArrayEquals(val2, e2.value());
+
+ // "Failure" branch isn't applied.
+ Entry e3 = storage.get(key3);
+
+ assertTrue(e3.empty());
+ }
+
+ @Test
+ public void invokeWithExistsCondition_failureBranch() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ boolean branch = storage.invoke(
+ new ExistenceCondition(ExistenceCondition.Type.EXISTS, key3),
+ List.of(new Operation(Operation.Type.PUT, key3, val3)),
+ List.of(
+ new Operation(Operation.Type.PUT, key1, val1_2),
+ new Operation(Operation.Type.PUT, key2, val2)
+ )
+ );
+
+ // "Failure" branch is applied.
+ assertFalse(branch);
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e1 = storage.get(key1);
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertEquals(2, e1.revision());
+ assertEquals(2, e1.updateCounter());
+ assertArrayEquals(val1_2, e1.value());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertArrayEquals(val2, e2.value());
+
+ // "Success" branch isn't applied.
+ Entry e3 = storage.get(key3);
+
+ assertTrue(e3.empty());
+ }
+
+ @Test
+ public void invokeWithNotExistsCondition_successBranch() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ boolean branch = storage.invoke(
+ new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key2),
+ List.of(
+ new Operation(Operation.Type.PUT, key1, val1_2),
+ new Operation(Operation.Type.PUT, key2, val2)
+ ),
+ List.of(new Operation(Operation.Type.PUT, key3, val3))
+ );
+
+ // "Success" branch is applied.
+ assertTrue(branch);
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e1 = storage.get(key1);
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertEquals(2, e1.revision());
+ assertEquals(2, e1.updateCounter());
+ assertArrayEquals(val1_2, e1.value());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertArrayEquals(val2, e2.value());
+
+ // "Failure" branch isn't applied.
+ Entry e3 = storage.get(key3);
+
+ assertTrue(e3.empty());
+ }
+
+ @Test
+ public void invokeWithNotExistsCondition_failureBranch() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ boolean branch = storage.invoke(
+ new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key1),
+ List.of(new Operation(Operation.Type.PUT, key3, val3)),
+ List.of(
+ new Operation(Operation.Type.PUT, key1, val1_2),
+ new Operation(Operation.Type.PUT, key2, val2)
+ )
+ );
+
+ // "Failure" branch is applied.
+ assertFalse(branch);
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e1 = storage.get(key1);
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertEquals(2, e1.revision());
+ assertEquals(2, e1.updateCounter());
+ assertArrayEquals(val1_2, e1.value());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertArrayEquals(val2, e2.value());
+
+ // "Success" branch isn't applied.
+ Entry e3 = storage.get(key3);
+
+ assertTrue(e3.empty());
+ }
+
+ @Test
+ public void invokeWithValueCondition_successBranch() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ boolean branch = storage.invoke(
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
+ List.of(
+ new Operation(Operation.Type.PUT, key1, val1_2),
+ new Operation(Operation.Type.PUT, key2, val2)
+ ),
+ List.of(new Operation(Operation.Type.PUT, key3, val3))
+ );
+
+ // "Success" branch is applied.
+ assertTrue(branch);
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e1 = storage.get(key1);
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertEquals(2, e1.revision());
+ assertEquals(2, e1.updateCounter());
+ assertArrayEquals(val1_2, e1.value());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertArrayEquals(val2, e2.value());
+
+ // "Failure" branch isn't applied.
+ Entry e3 = storage.get(key3);
+
+ assertTrue(e3.empty());
+ }
+
+ @Test
+ public void invokeWithValueCondition_failureBranch() {
+ byte[] key1 = k(1);
+ byte[] val1_1 = kv(1, 11);
+ byte[] val1_2 = kv(1, 12);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1_1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ boolean branch = storage.invoke(
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
+ List.of(new Operation(Operation.Type.PUT, key3, val3)),
+ List.of(
+ new Operation(Operation.Type.PUT, key1, val1_2),
+ new Operation(Operation.Type.PUT, key2, val2)
+ )
+ );
+
+ // "Failure" branch is applied.
+ assertFalse(branch);
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e1 = storage.get(key1);
+
+ assertFalse(e1.empty());
+ assertFalse(e1.tombstone());
+ assertEquals(2, e1.revision());
+ assertEquals(2, e1.updateCounter());
+ assertArrayEquals(val1_2, e1.value());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(3, e2.updateCounter());
+ assertArrayEquals(val2, e2.value());
+
+ // "Success" branch isn't applied.
+ Entry e3 = storage.get(key3);
+
+ assertTrue(e3.empty());
+ }
+
+ @Test
+ public void invokeOperations() {
+ byte[] key1 = k(1);
+ byte[] val1 = kv(1, 1);
+
+ byte[] key2 = k(2);
+ byte[] val2 = kv(2, 2);
+
+ byte[] key3 = k(3);
+ byte[] val3 = kv(3, 3);
+
+ assertEquals(0, storage.revision());
+ assertEquals(0, storage.updateCounter());
+
+ storage.put(key1, val1);
+
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ // No-op.
+ boolean branch = storage.invoke(
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+ List.of(new Operation(Operation.Type.NO_OP, null, null)),
+ List.of(new Operation(Operation.Type.NO_OP, null, null))
+ );
+
+ assertTrue(branch);
+
+ // No updates.
+ assertEquals(1, storage.revision());
+ assertEquals(1, storage.updateCounter());
+
+ // Put.
+ branch = storage.invoke(
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+ List.of(
+ new Operation(Operation.Type.PUT, key2, val2),
+ new Operation(Operation.Type.PUT, key3, val3)
+ ),
+ List.of(new Operation(Operation.Type.NO_OP, null, null))
+ );
+
+ assertTrue(branch);
+
+ // +1 for revision, +2 for update counter.
+ assertEquals(2, storage.revision());
+ assertEquals(3, storage.updateCounter());
+
+ Entry e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertFalse(e2.tombstone());
+ assertEquals(2, e2.revision());
+ assertEquals(2, e2.updateCounter());
+ assertArrayEquals(key2, e2.key());
+ assertArrayEquals(val2, e2.value());
+
+ Entry e3 = storage.get(key3);
+
+ assertFalse(e3.empty());
+ assertFalse(e3.tombstone());
+ assertEquals(2, e3.revision());
+ assertEquals(3, e3.updateCounter());
+ assertArrayEquals(key3, e3.key());
+ assertArrayEquals(val3, e3.value());
+
+ // Remove.
+ branch = storage.invoke(
+ new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
+ List.of(
+ new Operation(Operation.Type.REMOVE, key2, null),
+ new Operation(Operation.Type.REMOVE, key3, null)
+ ),
+ List.of(new Operation(Operation.Type.NO_OP, null, null))
+ );
+
+ assertTrue(branch);
+
+ // +1 for revision, +2 for update counter.
+ assertEquals(3, storage.revision());
+ assertEquals(5, storage.updateCounter());
+
+ e2 = storage.get(key2);
+
+ assertFalse(e2.empty());
+ assertTrue(e2.tombstone());
+ assertEquals(3, e2.revision());
+ assertEquals(4, e2.updateCounter());
+ assertArrayEquals(key2, e2.key());
+
+ e3 = storage.get(key3);
+
+ assertFalse(e3.empty());
+ assertTrue(e3.tombstone());
+ assertEquals(3, e3.revision());
+ assertEquals(5, e3.updateCounter());
+ assertArrayEquals(key3, e3.key());
+ }
+
+ @Test
public void compact() {
assertEquals(0, storage.revision());
assertEquals(0, storage.updateCounter());
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java
new file mode 100644
index 0000000..717da54
--- /dev/null
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java
@@ -0,0 +1,27 @@
+package org.apache.ignite.internal.metastorage.server;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class ValueConditionTest {
+ private static final byte[] key = new byte[] {1};
+
+ private static final byte[] val1 = new byte[] {11};
+
+ private static final byte[] val2 = new byte[] {22};
+
+ @Test
+ public void eq() {
+ ValueCondition cond = new ValueCondition(ValueCondition.Type.EQUAL, key, val1);
+
+ assertTrue(cond.test(new Entry(key, val1, 1, 1)));
+ }
+
+ @Test
+ public void ne() {
+ ValueCondition cond = new ValueCondition(ValueCondition.Type.NOT_EQUAL, key, val1);
+
+ assertTrue(cond.test(new Entry(key, val2, 1, 1)));
+ }
+}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 669b937..fca2125 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.metastorage;
import java.util.Collection;
-import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -308,37 +307,36 @@ import org.jetbrains.annotations.Nullable;
/**
* Invoke with single success/failure operation.
*
- * @see MetaStorageService#invoke(Condition, Collection, Collection)
+ * @see MetaStorageService#invoke(Condition, Operation, Operation)
*/
public @NotNull CompletableFuture<Boolean> invoke(
@NotNull Condition cond,
@NotNull Operation success,
@NotNull Operation failure
) {
- return metaStorageSvc.invoke(cond, Collections.singletonList(success), Collections.singletonList(failure));
+ return metaStorageSvc.invoke(cond, success, failure);
}
/**
* @see MetaStorageService#invoke(Condition, Collection, Collection)
*/
public @NotNull CompletableFuture<Boolean> invoke(
- @NotNull Condition cond,
- @NotNull Collection<Operation> success,
- @NotNull Collection<Operation> failure
+ @NotNull Condition cond,
+ @NotNull Collection<Operation> success,
+ @NotNull Collection<Operation> failure
) {
return metaStorageSvc.invoke(cond, success, failure);
}
/**
- * @see MetaStorageService#getAndInvoke(Key, Condition, Operation, Operation)
+ * @see MetaStorageService#getAndInvoke(Condition, Operation, Operation)
*/
public @NotNull CompletableFuture<Entry> getAndInvoke(
- @NotNull Key key,
@NotNull Condition cond,
@NotNull Operation success,
@NotNull Operation failure
) {
- return metaStorageSvc.getAndInvoke(key, cond, success, failure);
+ return metaStorageSvc.getAndInvoke(cond, success, failure);
}
/**
@@ -429,9 +427,10 @@ import org.jetbrains.annotations.Nullable;
try {
return vaultMgr.putAll(entries.stream().collect(
Collectors.toMap(
- e -> ByteArray.fromString(e.getKey().toString()),
- IgniteBiTuple::getValue)),
- revision);
+ e -> ByteArray.fromString(e.getKey().toString()),
+ IgniteBiTuple::getValue)
+ ),
+ revision);
}
catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException("Couldn't put entries with considered revision.", e);
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 4e608eb..8fccaba 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
import org.apache.ignite.configuration.RootKey;
@@ -202,9 +201,7 @@ public class IgnitionImpl implements Ignition {
private static void ackBanner() {
String ver = IgniteProperties.get(VER_KEY);
- String banner = Arrays
- .stream(BANNER)
- .collect(Collectors.joining("\n"));
+ String banner = String.join("\n", BANNER);
LOG.info(banner + '\n' + " ".repeat(22) + "Apache Ignite ver. " + ver + '\n');
}
@@ -213,70 +210,79 @@ public class IgnitionImpl implements Ignition {
private static MetaStorageService metaStorageServiceMock() {
return new MetaStorageService() {
@Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, @NotNull byte[] value) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override
public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition,
- @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ @NotNull Operation success, @NotNull Operation failure) {
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
- @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition,
- @NotNull Operation success, @NotNull Operation failure) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition,
+ @NotNull Collection<Operation> success,
+ @NotNull Collection<Operation> failure
+ ) {
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Condition condition,
+ @NotNull Operation success,
+ @NotNull Operation failure
+ ) {
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<IgniteUuid> watch(@Nullable Key keyFrom, @Nullable Key keyTo,
@@ -295,11 +301,11 @@ public class IgnitionImpl implements Ignition {
}
@Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
@Override public @NotNull CompletableFuture<Void> compact() {
- throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ throw new UnsupportedOperationException("Meta storage service is not implemented yet");
}
};
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 012d57b..809bc40 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -81,7 +81,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private CompletableFuture<Long> tableCreationSubscriptionFut;
/** Tables. */
- private Map<String, TableImpl> tables = new ConcurrentHashMap<>();
+ private final Map<String, TableImpl> tables = new ConcurrentHashMap<>();
/*
* @param configurationMgr Configuration manager.
@@ -228,7 +228,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
.tables().listen(ctx -> {
Set<String> tablesToStart = ctx.newValue().namedListKeys() == null ?
- Collections.EMPTY_SET : ctx.newValue().namedListKeys();
+ Collections.emptySet() : ctx.newValue().namedListKeys();
tablesToStart.removeAll(ctx.oldValue().namedListKeys());
@@ -242,18 +242,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
UUID tblId = new UUID(revision, update);
- var key = new Key(INTERNAL_PREFIX + tblId.toString());
+ var key = new Key(INTERNAL_PREFIX + tblId);
+
futs.add(metaStorageMgr.invoke(
- Conditions.key(key).value().eq(null),
+ Conditions.notExists(key),
Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
Operations.noop()).thenCompose(res ->
- res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0])
+ res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId), new byte[0])
.thenApply(v -> true)
: CompletableFuture.completedFuture(false)));
}
Set<String> tablesToStop = ctx.oldValue().namedListKeys() == null ?
- Collections.EMPTY_SET : ctx.oldValue().namedListKeys();
+ Collections.emptySet() : ctx.oldValue().namedListKeys();
tablesToStop.removeAll(ctx.newValue().namedListKeys());
@@ -262,12 +263,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
UUID tblId = t.internalTable().tableId();
- var key = new Key(INTERNAL_PREFIX + "assignment." + tblId.toString());
+ var key = new Key(INTERNAL_PREFIX + "assignment." + tblId);
futs.add(metaStorageMgr.invoke(
- Conditions.key(key).value().ne(null),
+ Conditions.value(key).ne(null),
Operations.remove(key),
Operations.noop()).thenCompose(res ->
- res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId.toString()))
+ res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId))
.thenApply(v -> true)
: CompletableFuture.completedFuture(false)));
}