You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/05/04 12:30:56 UTC

[ignite-3] branch main updated: IGNITE-14670 Updated metastorage service API: range with applied revision, invoke for multiple updates. Fixes #113

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new eccad97  IGNITE-14670 Updated metastorage service API: range with applied revision, invoke for multiple updates. Fixes #113
eccad97 is described below

commit eccad97b6f30818113ea822374df11116462ff5b
Author: Kirill Gusakov <kg...@gmail.com>
AuthorDate: Tue May 4 15:29:57 2021 +0300

    IGNITE-14670 Updated metastorage service API: range with applied revision, invoke for multiple updates. Fixes #113
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../ignite/internal/affinity/AffinityManager.java  |  7 ++--
 .../metastorage/client/MetaStorageService.java     | 11 +++---
 .../ignite/metastorage/common/Condition.java       | 46 ++++++++++++++--------
 .../ignite/metastorage/common/Conditions.java      | 31 +++++++++++++--
 .../ignite/metastorage/common/Operation.java       | 17 ++++++--
 .../ignite/metastorage/common/Operations.java      |  8 ++--
 .../internal/metastorage/MetaStorageManager.java   | 43 ++++++++++++++++++--
 .../apache/ignite/internal/app/IgnitionImpl.java   |  4 +-
 .../internal/table/distributed/TableManager.java   | 12 +++---
 9 files changed, 131 insertions(+), 48 deletions(-)

diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 82df3f8..37690bf 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -150,9 +150,10 @@ public class AffinityManager {
                             int replicas = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
                                 .tables().get(name).replicas().value();
 
-                            metaStorageMgr.invoke(evt.newEntry().key(),
-                                Conditions.value().eq(assignmentVal),
-                                Operations.put(ByteUtils.toBytes(
+                            var key = evt.newEntry().key();
+                            metaStorageMgr.invoke(
+                                Conditions.key(key).value().eq(assignmentVal),
+                                Operations.put(key, ByteUtils.toBytes(
                                     RendezvousAffinityFunction.assignPartitions(
                                         baselineMgr.nodes(),
                                         partitions,
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
index dfb4868..734d5e4 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
@@ -202,11 +202,10 @@ public interface MetaStorageService {
      *
      * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
      *
-     * @param key The key. Couldn't be {@code null}.
      * @param condition The condition.
-     * @param success The update which will be applied in case of condition evaluation yields {@code true}.
-     * @param failure The update which will be applied in case of condition evaluation yields {@code false}.
-     * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}.
+     * @param success Batch of updates which will be atomically applied in case of condition evaluation yields {@code true}.
+     * @param failure Batch of updates which will be atomically applied in case of condition evaluation yields {@code false}.
+     * @return Future result {@code true} if {@code success} updates were applied, otherwise {@code false}.
      * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
      * @see Key
      * @see Entry
@@ -215,8 +214,8 @@ public interface MetaStorageService {
      */
     // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
     @NotNull
-    CompletableFuture<Boolean> invoke(@NotNull Key key, @NotNull Condition condition,
-                                      @NotNull Operation success, @NotNull Operation failure);
+    CompletableFuture<Boolean> invoke(@NotNull Condition condition,
+        @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure);
 
     /**
      * Updates an entry for the given key conditionally.
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
index 359829b..54049de 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
@@ -59,16 +59,19 @@ public final class Condition {
          */
         private Type type;
 
-        /**
-         * The revision as the condition argument.
-         */
+        /** The revision as the condition argument. */
         private long rev;
 
+        /** Key of entry, which will be tested for condition. */
+        private final Key key;
+
         /**
-         * Default no-op constructor.
+         * Creates a new condition for the given {@code key}.
+         *
+         * @param key Key of entry, to be tested for the condition.
          */
-        RevisionCondition() {
-            // No-op.
+        RevisionCondition(Key key) {
+            this.key = key;
         }
 
         /**
@@ -175,9 +178,13 @@ public final class Condition {
 
         /** {@inheritDoc} */
         @Override public boolean test(Entry e) {
-            int res = Long.compare(e.revision(), rev);
+            if ((e.key() == key) || (e.key() != null && e.key().equals(key))) {
+                int res = Long.compare(e.revision(), rev);
 
-            return type.test(res);
+                return type.test(res);
+            }
+            else
+                return false;
         }
 
         /**
@@ -249,16 +256,19 @@ public final class Condition {
          */
         private Type type;
 
-        /**
-         * The value as the condition argument.
-         */
+        /** The value as the condition argument. */
         private byte[] val;
 
+        /** Key of entry, which will be tested for condition. */
+        private final Key key;
+
         /**
-         * Default no-op constructor.
+         * Creates a new condition for the given {@code key}.
+         *
+         * @param key Key of entry, to be tested for the condition.
          */
-        ValueCondition() {
-            // No-op.
+        ValueCondition(Key key) {
+            this.key = key;
         }
 
         /**
@@ -297,9 +307,13 @@ public final class Condition {
 
         /** {@inheritDoc} */
         @Override public boolean test(Entry e) {
-            int res = Arrays.compare(e.value(), val);
+            if ((e.key() == key) || (e.key() != null && e.key().equals(key))) {
+                int res = Arrays.compare(e.value(), val);
 
-            return type.test(res);
+                return type.test(res);
+            }
+            else
+               return false;
         }
 
         /**
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
index f83849a..87fa238 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
@@ -24,14 +24,27 @@ package org.apache.ignite.metastorage.common;
  * @see Condition
  */
 public final class Conditions {
+
+    /** Key. */
+    private Key key;
+
+    /**
+     * Creates new condition for entry with concrete key.
+     *
+     * @param key Key
+     */
+    private Conditions(Key key) {
+        this.key = key;
+    }
+
     /**
      * Creates condition on entry revision.
      *
      * @return Condition on entry revision.
      * @see Condition.RevisionCondition
      */
-    public static Condition.RevisionCondition revision() {
-        return new Condition.RevisionCondition();
+    public Condition.RevisionCondition revision() {
+        return new Condition.RevisionCondition(key);
     }
 
     /**
@@ -40,8 +53,18 @@ public final class Conditions {
      * @return Condition on entry value.
      * @see Condition.ValueCondition
      */
-    public static Condition.ValueCondition value() {
-        return new Condition.ValueCondition();
+    public Condition.ValueCondition value() {
+        return new Condition.ValueCondition(key);
+    }
+
+    /**
+     * Creates key-based condition.
+     *
+     * @param key Key of condition.
+     * @return Key-based condition instance.
+     */
+    public static Conditions key(Key key) {
+        return new Conditions(key);
     }
 
     /**
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
index e085a28..9810fe5 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java
@@ -38,11 +38,16 @@ public final class Operation {
      * Represents operation of type <i>remove</i>.
      */
     public static final class RemoveOp implements InnerOp {
+        /** Key. */
+        private final Key key;
+
         /**
-         * Default no-op constructor.
+         * Creates a new remove operation for the given {@code key}.
+         *
+         * @param key Key.
          */
-        RemoveOp() {
-            // No-op.
+        RemoveOp(Key key) {
+            this.key = key;
         }
     }
 
@@ -50,6 +55,9 @@ public final class Operation {
      * Represents operation of type <i>put</i>.
      */
     public static final class PutOp implements InnerOp {
+        /** Key. */
+        private final Key key;
+
         /** Value. */
         private final byte[] val;
 
@@ -58,7 +66,8 @@ public final class Operation {
          *
          * @param val The value to which the entry should be updated.
          */
-        PutOp(byte[] val) {
+        PutOp(Key key, byte[] val) {
+            this.key = key;
             this.val = val;
         }
     }
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
index 994f4bd..31c7449 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java
@@ -32,8 +32,8 @@ public final class Operations {
      *
      * @return Operation of type <i>remove</i>.
      */
-    public static Operation remove() {
-        return new Operation(new Operation.RemoveOp());
+    public static Operation remove(Key key) {
+        return new Operation(new Operation.RemoveOp(key));
     }
 
     /**
@@ -42,8 +42,8 @@ public final class Operations {
      * @param value Value.
      * @return Operation of type <i>put</i>.
      */
-    public static Operation put(byte[] value) {
-        return new Operation(new Operation.PutOp(value));
+    public static Operation put(Key key, byte[] value) {
+        return new Operation(new Operation.PutOp(key, value));
     }
 
     /**
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 4e06338..669b937 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.metastorage;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -31,11 +32,13 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteInternalException;
 import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.metastorage.client.MetaStorageService;
+import org.apache.ignite.metastorage.common.CompactedException;
 import org.apache.ignite.metastorage.common.Condition;
 import org.apache.ignite.metastorage.common.Cursor;
 import org.apache.ignite.metastorage.common.Entry;
 import org.apache.ignite.metastorage.common.Key;
 import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.OperationTimeoutException;
 import org.apache.ignite.metastorage.common.WatchListener;
 import org.apache.ignite.network.ClusterService;
 import org.jetbrains.annotations.NotNull;
@@ -303,15 +306,27 @@ import org.jetbrains.annotations.Nullable;
     }
 
     /**
-     * @see MetaStorageService#invoke(Key, Condition, Operation, Operation)
+     * Invoke with single success/failure operation.
+     *
+     * @see MetaStorageService#invoke(Condition, Collection, Collection)
      */
     public @NotNull CompletableFuture<Boolean> invoke(
-        @NotNull Key key,
         @NotNull Condition cond,
         @NotNull Operation success,
         @NotNull Operation failure
     ) {
-        return metaStorageSvc.invoke(key, cond, success, failure);
+        return metaStorageSvc.invoke(cond, Collections.singletonList(success), Collections.singletonList(failure));
+    }
+
+    /**
+     * @see MetaStorageService#invoke(Condition, Collection, Collection)
+     */
+    public @NotNull CompletableFuture<Boolean> invoke(
+        @NotNull Condition cond,
+        @NotNull Collection<Operation> success,
+        @NotNull Collection<Operation> failure
+    ) {
+        return metaStorageSvc.invoke(cond, success, failure);
     }
 
     /**
@@ -334,6 +349,28 @@ import org.jetbrains.annotations.Nullable;
     }
 
     /**
+     * Retrieves entries for the given key range in lexicographic order.
+     * Entries will be filtered out by the current applied revision as an upper bound.
+     * Applied revision is a revision of the last successful vault update.
+     *
+     * @param keyFrom Start key of range (inclusive). Couldn't be {@code null}.
+     * @param keyTo End key of range (exclusive). Could be {@code null}.
+     * @return Cursor built upon entries corresponding to the given range and applied revision.
+     * @throws OperationTimeoutException If the operation is timed out.
+     * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+     * @see Key
+     * @see Entry
+     */
+    public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull Key keyFrom, @Nullable Key keyTo) {
+        try {
+            return metaStorageSvc.range(keyFrom, keyTo, vaultMgr.appliedRevision());
+        }
+        catch (IgniteInternalCheckedException e) {
+            throw new IgniteInternalException(e);
+        }
+    }
+
+    /**
      * @see MetaStorageService#range(Key, Key)
      */
     public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 6c6b0f2..4e608eb 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -261,8 +261,8 @@ public class IgnitionImpl implements Ignition {
                 throw new UnsupportedOperationException("Metastorage service is not implemented yet");
             }
 
-            @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Key key, @NotNull Condition condition,
-                @NotNull Operation success, @NotNull Operation failure) {
+            @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition,
+                @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure) {
                 throw new UnsupportedOperationException("Metastorage service is not implemented yet");
             }
 
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1d65043..012d57b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -242,10 +242,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 UUID tblId = new UUID(revision, update);
 
+                var key = new Key(INTERNAL_PREFIX + tblId.toString());
                 futs.add(metaStorageMgr.invoke(
-                    new Key(INTERNAL_PREFIX + tblId.toString()),
-                    Conditions.value().eq(null),
-                    Operations.put(tableView.name().getBytes(StandardCharsets.UTF_8)),
+                    Conditions.key(key).value().eq(null),
+                    Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
                     Operations.noop()).thenCompose(res ->
                     res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0])
                         .thenApply(v -> true)
@@ -262,10 +262,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
                 UUID tblId = t.internalTable().tableId();
 
+                var key = new Key(INTERNAL_PREFIX + "assignment." + tblId.toString());
                 futs.add(metaStorageMgr.invoke(
-                    new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()),
-                    Conditions.value().ne(null),
-                    Operations.remove(),
+                    Conditions.key(key).value().ne(null),
+                    Operations.remove(key),
                     Operations.noop()).thenCompose(res ->
                     res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId.toString()))
                         .thenApply(v -> true)