You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/03/03 10:00:44 UTC
[ignite-3] branch ignite-14198 updated: IGNITE-14918 Meta storage
interface: batch operations, watches, conditional updates
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch ignite-14198
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-14198 by this push:
new 93b8672 IGNITE-14918 Meta storage interface: batch operations, watches, conditional updates
93b8672 is described below
commit 93b8672c38c9806d5aed6c3b65b563074551700a
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Wed Mar 3 12:59:35 2021 +0300
IGNITE-14918 Meta storage interface: batch operations, watches, conditional updates
---
.../org/apache/ignite/metastorage/Example.java | 56 ++++
.../metastorage/client/MetaStorageService.java | 300 +++++++++++++++++----
.../metastorage/common/AbstractCondition.java | 77 ------
.../metastorage/common/CompactedException.java | 59 ++++
.../ignite/metastorage/common/Condition.java | 132 ++++++++-
.../common/{ConditionType.java => Conditions.java} | 23 +-
.../apache/ignite/metastorage/common/Entry.java | 124 +--------
...ndition.java => OperationTimeoutException.java} | 42 +--
.../ignite/metastorage/common/PutUpdate.java | 47 ----
.../metastorage/common/RevisionCondition.java | 48 ----
.../apache/ignite/metastorage/common/Update.java | 35 ++-
.../common/{RemoveUpdate.java => Updates.java} | 28 +-
12 files changed, 570 insertions(+), 401 deletions(-)
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/Example.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/Example.java
new file mode 100644
index 0000000..484ed3e
--- /dev/null
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/Example.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.metastorage;
+
+import org.apache.ignite.metastorage.client.MetaStorageService;
+
+import static org.apache.ignite.metastorage.common.Conditions.revision;
+import static org.apache.ignite.metastorage.common.Conditions.value;
+import static org.apache.ignite.metastorage.common.Updates.noop;
+import static org.apache.ignite.metastorage.common.Updates.put;
+import static org.apache.ignite.metastorage.common.Updates.remove;
+
+public class Example {
+
+ /**
+ * Usage of conditional update example.
+ * @param args
+ */
+ @SuppressWarnings("ConstantConditions")
+ public static void main(String[] args) {
+ MetaStorageService srv = getMetaStorageService();
+
+ byte[] key = "key".getBytes();
+ byte[] val = "val".getBytes();
+ byte[] newVal = "newVal".getBytes();
+
+ srv.update(key, revision().less(10), put(val), remove());
+ srv.update(key, value().equal(newVal), put(val), noop());
+
+ // Soon...
+ // srv.update(revision(key).less(10), put(key, val), remove(key));
+ // srv.update(value(key).equal(newVal), put(key, val), noop());
+
+ }
+
+ /**
+ * @return Meta storage service.
+ */
+ public static MetaStorageService getMetaStorageService() {
+ return null;
+ }
+}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
index 178f633..c18f481 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
@@ -18,138 +18,326 @@
package org.apache.ignite.metastorage.client;
+import org.apache.ignite.metastorage.common.CompactedException;
import org.apache.ignite.metastorage.common.Condition;
import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.OperationTimeoutException;
import org.apache.ignite.metastorage.common.Update;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
+import java.util.Collection;
import java.util.List;
-import java.util.concurrent.Future;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
+import java.util.stream.Stream;
/**
* Defines interface for access to a metastorage service.
*/
public interface MetaStorageService {
/**
- * Returns metastorage revision.
+ * Retrieves an entry for the given key.
*
- * @return Metastorage revision.
+ * @param key Key. Couldn't be {@code null}.
+ * @return An entry for the given key. Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Entry
*/
@NotNull
- Future<Long> revision();
+ CompletableFuture<Entry> get(@NotNull byte[] key);
/**
- * Updates entry with given key and value.
+ * Retrieves an entry for the given key and the revision upper bound.
*
- * @param key Key. Couldn't be {@code null}.
- * @param value Value.Couldn't be {@code null}.
- * @return A previous entry which could be regular, empty or tombstone. Couldn't be {@code null}.
+ * @param key The key. Couldn't be {@code null}.
+ * @param revUpperBound The upper bound for entry revisions. Must be positive.
+ * @return An entry for the given key and maximum revision limited by {@code revUpperBound}.
+ * Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * Will be thrown on getting future result.
* @see Entry
*/
@NotNull
- Future<Entry> put(@NotNull byte[] key, @NotNull byte[] value);
+ CompletableFuture<Entry> get(@NotNull byte[] key, long revUpperBound);
/**
- * Retrieves entry with a given key.
+ * Retrieves entries for given keys.
*
- * @param key Key. Couldn't be {@code null}.
- * @return An entry for given key or an empty/tombstone entry. Couldn't be {@code null}.
+ * @param keys The collection of keys. Couldn't be {@code null} or empty.
+ * Collection elements couldn't be {@code null}.
+ * @return A list of entries for given keys. The order of entries in the result list corresponds to
+ * the traversal order of {@code keys} collection. Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
* @see Entry
*/
@NotNull
- Future<Entry> get(@NotNull byte[] key);
+ CompletableFuture<List<Entry>> getAll(Collection<byte[]> keys);
/**
- * Retrieves entry with a given key and a revision.
+ * Retrieves entries for given keys and the revision upper bound.
*
- * @param key Key. Couldn't be {@code null}.
- * @param rev Revision. Must be positive.
- * @return An entry for given key and a revision or an empty/tombstone entry. Couldn't be {@code null}.
+ * @param keys The collection of keys. Couldn't be {@code null} or empty.
+ * Collection elements couldn't be {@code null}.
+ * @param revUpperBound The upper bound for entry revisions. Must be positive.
+ * @return A list of entries for given keys and maximum revision limited by {@code revUpperBound}.
+ * The order of entries in the result list corresponds to the traversal order of {@code keys} collection.
+ * Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * Will be thrown on getting future result.
* @see Entry
*/
- //TODO: Is it really needed???
@NotNull
- Future<Entry> get(@NotNull byte[] key, long rev);
+ CompletableFuture<List<Entry>> getAll(Collection<byte[]> keys, long revUpperBound);
/**
- * Removes an entry with a given key.
+ * Inserts or updates an entry with the given key and the given value.
*
- * @param key Key. Couldn't be {@code null}.
- * @return A previous entry which could be regular, empty or tombstone. Couldn't be {@code null}.
+ * @param key The key. Couldn't be {@code null}.
+ * @param value The value.Couldn't be {@code null}.
+ * @return Completed future.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
* @see Entry
*/
@NotNull
- Future<Entry> remove(@NotNull byte[] key);
+ CompletableFuture<Void> put(@NotNull byte[] key, @NotNull byte[] value);
/**
- * Updates entry conditionally.
+ * Inserts or updates an entry with the given key and the given value and
+ * retrieves a previous entry for the given key.
*
- * @param key Key. Couldn't be {@code null}.
- * @param condition Condition.
- * @param success Update which will be applied in case of condition success.
- * @param failure Update which will be applied in case of condition failure.
- * @return A previous entry which could be regular, empty or tombstone. Couldn't be {@code null}.
+ * @param key The key. Couldn't be {@code null}.
+ * @param value The value.Couldn't be {@code null}.
+ * @return A previous entry for the given key. Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Entry
+ */
+ @NotNull
+ CompletableFuture<Entry> getAndPut(@NotNull byte[] key, @NotNull byte[] value);
+
+ /**
+ * Inserts or updates entries with given keys and given values.
+ * Size of {@code keys} and {@code values} must be the same.
+ *
+ * @param keys The list of keys. Couldn't be {@code null} or empty.
+ * @param values The list of values corresponding to the list of keys. Couldn't be {@code null} or empty.
+ * @return Completed future.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Entry
+ */
+ @NotNull
+ CompletableFuture<Void> putAll(@NotNull List<byte[]> keys, @NotNull List<byte[]> values);
+
+ /**
+ * Inserts or updates entries with given keys and given values and
+ * retrieves a previous entries for given keys.
+ * Size of {@code keys} and {@code values} must be the same.
+ *
+ * @param keys The list of keys. Couldn't be {@code null} or empty.
+ * @param values The list of values corresponding to the list of keys. Couldn't be {@code null} or empty.
+ * @return A list of entries for given keys. Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Entry
+ */
+ @NotNull
+ CompletableFuture<List<Entry>> getAndPutAll(@NotNull List<byte[]> keys, @NotNull List<byte[]> values);
+
+ /**
+ * Removes an entry for the given key.
+ *
+ * @param key The key. Couldn't be {@code null}.
+ * @return Completed future.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Entry
+ */
+ @NotNull
+ CompletableFuture<Void> remove(@NotNull byte[] key);
+
+ /**
+ * Removes an entry for the given key.
+ *
+ * @param key The key. Couldn't be {@code null}.
+ * @return A previous entry for the given key. Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Entry
+ */
+ @NotNull
+ CompletableFuture<Void> getAndRemove(@NotNull byte[] key);
+
+ /**
+ * Removes entries for given keys.
+ *
+ * @param key The key. Couldn't be {@code null}.
+ * @return Completed future.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Entry
+ */
+ @NotNull
+ CompletableFuture<Void> removeAll(@NotNull Collection<byte[]> key);
+
+ /**
+ * Removes entries for given keys and retrieves previous entries.
+ *
+ * @param key The key. Couldn't be {@code null}.
+ * @return A list of previous entries for given keys..
+ * The order of entries in the result list corresponds to the traversal order of {@code keys} collection.
+ * Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @see Entry
+ */
+ @NotNull
+ CompletableFuture<List<Entry>> getAndRemoveAll(@NotNull Collection<byte[]> key);
+
+
+ /**
+ * Updates an entry for the given key conditionally.
+ *
+ * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
+ *
+ * @param key The key. Couldn't be {@code null}.
+ * @param condition The condition.
+ * @param success The update which will be applied in case of condition evaluation yields {@code true}.
+ * @param failure The update which will be applied in case of condition evaluation yields {@code false}.
+ * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
* @see Entry
+ * @see Condition
+ * @see Update
*/
- Future<Entry> update(@NotNull byte[] key, Condition condition, Update success, Update failure);
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
+ @NotNull
+ CompletableFuture<Boolean> update(@NotNull byte[] key, @NotNull Condition condition,
+ @NotNull Update success, @NotNull Update failure);
+
/**
- * Updates multiple entries conditionally.
+ * Updates an entry for the given key conditionally.
+ *
+ * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p>
*
- * @param keys List of keys.
- * @param condition List of conditions corresponding to keys list.
- * @param success List of updates which will be applied to corresponding key in case of condition success.
- * @param failure List of updates which will be applied to corresponding key in case of condition failure.
- * @return A List of previous entries corresponding to list of keys, where each entry could be regular,
- * empty or tombstone. Couldn't be {@code null}.
+ * @param key The key. Couldn't be {@code null}.
+ * @param condition The condition.
+ * @param success The update which will be applied in case of condition evaluation yields {@code true}.
+ * @param failure The update which will be applied in case of condition evaluation yields {@code false}.
+ * @return A previous entry for the given key.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
* @see Entry
+ * @see Condition
+ * @see Update
*/
- // TODO: If I understand correctly, we always use one success and one failure condition for each key in transaction. May be I'm wrong.
- // TODO: Probably, we should provide no-op conditions also (e.g. for implementation if-then only logic).
- Future<List<Entry>> update(List<byte[]> keys, List<Condition> condition, List<Update> success, List<Update> failure);
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update.
+ @NotNull
+ CompletableFuture<Entry> getAndUpdate(@NotNull byte[] key, @NotNull Condition condition,
+ @NotNull Update success, @NotNull Update failure);
/**
- * Retrieves entries for a given key range in lexicographic order.
- * Only entries with the latest revisions will be returned.
+ * Retrieves entries for the given key range in lexicographic order. Entries will be filtered out by upper bound
+ * of given revision number.
*
* @param keyFrom Start key of range (inclusive). Couldn't be {@code null}.
* @param keyTo End key of range (exclusive). Could be {@code null}.
- * @param consumer Entry consumer which will be invoked for each entry. Entry couldn't be {@code null}.
- * @return Future which will be completed when iteration will be finished. Couldn't be {@code null}.
+ * @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision.
+ * @return Stream of entries corresponding to the given range and revision.
+ * @throws OperationTimeoutException If the operation is timed out.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * @see Entry
*/
@NotNull
- Future<Void> iterate(@NotNull byte[] keyFrom, @Nullable byte[] keyTo, @NotNull Consumer<Entry> consumer);
+ Stream<Entry> range(@NotNull byte[] keyFrom, @Nullable byte[] keyTo, long revUpperBound);
/**
- * Creates watcher on metastorage updates with given parameters.
+ * Retrieves entries for the given key range in lexicographic order. Short cut for
+ * {@link #range(byte[], byte[], long)} where {@code revUpperBound == -1}.
+ *
+ * @param keyFrom Start key of range (inclusive). Couldn't be {@code null}.
+ * @param keyTo End key of range (exclusive). Could be {@code null}.
+ * @return Stream of entries corresponding to the given range and revision.
+ * @throws OperationTimeoutException If the operation is timed out.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * @see Entry
+ */
+ @NotNull
+ Stream<Entry> range(@NotNull byte[] keyFrom, @Nullable byte[] keyTo);
+
+ /**
+ * Subscribes on meta storage updates matching the parameters.
*
* @param keyFrom Start key of range (inclusive). Could be {@code null}.
* @param keyTo End key of range (exclusive). Could be {@code null}.
- * @param revision Start revision.
- * @param consumer Entry consumer which will be invoked for each update. Entry couldn't be {@code null}.
- * @return Watch identifier.
+ * @param revision Start revision inclusive. {@code 0} - all revision,
+ * {@code -1} - latest revision (accordingly to current meta storage state).
+ * @param consumer Entries consumer which will be invoked for each update. Entry couldn't be {@code null}.
+ * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * Will be thrown on getting future result.
+ * @throws WatchTerminatedException
+ * @see Entry
*/
@NotNull
- Future<Long> watch(@Nullable byte[] keyFrom, @Nullable byte[] keyTo, long revision,
- @NotNull BiConsumer<Entry, Entry> consumer);
+ //TODO: Q: WatchTerminatedException???
+ CompletableFuture<UUID> watch(@Nullable byte[] keyFrom, @Nullable byte[] keyTo, long revision,
+ @NotNull BiConsumer<List<Entry>, List<Entry>> consumer);
+
+ /**
+ * Subscribes on meta storage updates for the given key.
+ *
+ * @param key The target key. Could be {@code null}.
+ * @param revision Start revision inclusive. {@code 0} - all revision,
+ * {@code -1} - latest revision (accordingly to current meta storage state).
+ * @param consumer Entries consumer which will be invoked for each update. Entry couldn't be {@code null}.
+ * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * Will be thrown on getting future result.
+ * @throws WatchTerminatedException
+ * @see Entry
+ */
+ @NotNull
+ //TODO: Q: WatchTerminatedException???
+ CompletableFuture<UUID> watch(@NotNull byte[] key, long revision,
+ @NotNull Consumer<Entry> consumer);
+
+ /**
+ * Subscribes on meta storage updates for given keys.
+ *
+ * @param keys Collection of target keys. Could be {@code null}.
+ * @param revision Start revision inclusive. {@code 0} - all revision,
+ * {@code -1} - latest revision (accordingly to current meta storage state).
+ * @param consumer Entries consumer which will be invoked for each update. Entry couldn't be {@code null}.
+ * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
+ * @throws CompactedException If the desired revisions are removed from the storage due to a compaction.
+ * Will be thrown on getting future result.
+ * @throws WatchTerminatedException
+ * @see Entry
+ */
+ @NotNull
+ //TODO: Q: WatchTerminatedException???
+ CompletableFuture<UUID> watch(@NotNull Collection<byte[]> keys, long revision,
+ @NotNull Consumer<List<Entry>> consumer);
+
/**
- * Stops watch with a given identifier.
+ * Cancels subscription for the given identifier.
*
- * @param watchId Watch identifier.
+ * @param id Subscription identifier.
* @return Completed future in case of operation success. Couldn't be {@code null}.
+ * * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
*/
@NotNull
- Future<Void> stopWatch(long watchId);
+ CompletableFuture<Void> stopWatch(@NotNull UUID id);
/**
- * Compacts metastorage (removes all tombstone entries and old entries except of entries with latest revision).
+ * Compacts meta storage (removes all tombstone entries and old entries except of entries with latest revision).
*
* @return Completed future. Couldn't be {@code null}.
+ * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
*/
- Future<Void> compact();
+ CompletableFuture<Void> compact();
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/AbstractCondition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/AbstractCondition.java
deleted file mode 100644
index 9bfb570..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/AbstractCondition.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.metastorage.common;
-
-import static org.apache.ignite.metastorage.common.ConditionType.EQUAL;
-import static org.apache.ignite.metastorage.common.ConditionType.GREATER;
-import static org.apache.ignite.metastorage.common.ConditionType.LESS;
-import static org.apache.ignite.metastorage.common.ConditionType.NOT_EQUAL;
-
-/**
- * Abstract condition which is designed for a comparison based conditions.
- *
- * <p>The comparison type is defined by {@link ConditionType} enumeration. Only {@link #compare(Entry)} method
- * should be implemented for getting correct comparison based condition evaluation.</p>
- */
-public abstract class AbstractCondition implements Condition {
- /** Condition type. */
- private final ConditionType type;
-
- /**
- * Constructor.
- *
- * @param type Condition type.
- */
- public AbstractCondition(ConditionType type) {
- this.type = type;
- }
-
- /**
- * Returns condition type for this condition.
- *
- * @return Condition type.
- */
- protected ConditionType conditionType() {
- return type;
- }
-
- /**
- * Evaluates comparison based condition.
- *
- * @param e Entry which is a subject of conditional update.
- * @return {@code True} if condition is successful, otherwise - {@code false}.
- */
- @Override public boolean eval(Entry e) {
- int res = compare(e);
-
- ConditionType type = conditionType();
-
- return (type == EQUAL && res == 0) ||
- (type == NOT_EQUAL && res != 0) ||
- (type == LESS && res < 0) ||
- (type == GREATER && res > 0);
- }
-
- /**
- * This abstract method should implement comparison logic based on {@link java.util.Comparator} contract.
- *
- * @param e Entry.
- * @return Comparison result as defined {@link java.util.Comparator} contract.
- */
- abstract protected int compare(Entry e);
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/CompactedException.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/CompactedException.java
new file mode 100644
index 0000000..6ea9f43
--- /dev/null
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/CompactedException.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.metastorage.common;
+
+/**
+ * Thrown when a requested operation on meta storage could not be performed because target revisions were removed
+ * from storage due to a compaction procedure. In such case the operation should be retried with actual revision.
+ */
+public class CompactedException extends RuntimeException {
+ /**
+ * Constructs an exception.
+ */
+ public CompactedException() {
+ super();
+ }
+
+ /**
+ * Constructs an exception with a given message.
+ *
+ * @param message Detail message.
+ */
+ public CompactedException(String message) {
+ super(message);
+ }
+
+ /**
+ * Constructs an exception with a given message and a cause.
+ *
+ * @param message Detail message.
+ * @param cause Cause.
+ */
+ public CompactedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ /**
+ * Constructs an exception with a given cause.
+ *
+ * @param cause Cause.
+ */
+ public CompactedException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
index e1a81ed..6fa3758 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java
@@ -17,15 +17,129 @@
package org.apache.ignite.metastorage.common;
+import java.util.Arrays;
+
/**
- * Defines condition interface for metastorage conditional update.
+ * Represents condition for conditional update.
*/
-public interface Condition {
- /**
- * Should implements logic for condition evaluation.
- *
- * @param e Entry which is a subject of conditional update.
- * @return {@code True} if condition is successful, otherwise - {@code false}.
- */
- boolean eval(Entry e);
+public final class Condition {
+ private final InnerCondition cnd;
+
+ Condition(InnerCondition cnd) {
+ this.cnd = cnd;
+ }
+
+ public boolean test(Entry e) {
+ return cnd.test(e);
+ }
+
+ public static final class RevisionCondition implements InnerCondition {
+ private Type type;
+ private long rev;
+
+ RevisionCondition() {
+ // No-op.
+ }
+
+ public Condition equal(long rev) {
+ validate();
+
+ this.type = Type.EQUAL;
+ this.rev = rev;
+
+ return new Condition(this);
+ }
+
+ public Condition notEqual(long rev) {
+ validate();
+
+ this.type = Type.NOT_EQUAL;
+ this.rev = rev;
+
+ return new Condition(this);
+ }
+
+ public Condition greater(long rev) {
+ validate();
+
+ this.type = Type.GREATER;
+ this.rev = rev;
+
+ return new Condition(this);
+ }
+
+ public Condition less(long rev) {
+ validate();
+
+ this.type = Type.LESS;
+ this.rev = rev;
+
+ return new Condition(this);
+ }
+
+ @Override public boolean test(Entry e) {
+ int res = Long.compare(e.revision(), rev);
+
+ return (type == Type.EQUAL && res == 0) ||
+ (type == Type.NOT_EQUAL && res != 0) ||
+ (type == Type.GREATER && res > 0) ||
+ (type == Type.LESS && res < 0);
+ }
+
+ private void validate() {
+ if (type != null)
+ throw new IllegalStateException("Condition type " + type.name() + " is already defined.");
+ }
+
+ enum Type {
+ EQUAL, NOT_EQUAL, GREATER, LESS;
+ }
+ }
+
+ public static final class ValueCondition implements InnerCondition {
+ private Type type;
+ private byte[] val;
+
+ ValueCondition() {
+ // No-op.
+ }
+
+ public Condition equal(byte[] val) {
+ validate();
+
+ this.type = Type.EQUAL;
+ this.val = val;
+
+ return new Condition(this);
+ }
+
+ public Condition notEqual(byte[] val) {
+ validate();
+
+ this.type = Type.NOT_EQUAL;
+ this.val = val;
+
+ return new Condition(this);
+ }
+
+ @Override public boolean test(Entry e) {
+ int res = Arrays.compare(e.value(), val);
+
+ return (type == Type.EQUAL && res == 0) ||
+ (type == Type.NOT_EQUAL && res != 0);
+ }
+
+ private void validate() {
+ if (type != null)
+ throw new IllegalStateException("Condition type " + type.name() + " is already defined.");
+ }
+
+ enum Type {
+ EQUAL, NOT_EQUAL
+ }
+ }
+
+ private interface InnerCondition {
+ boolean test(Entry e);
+ }
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ConditionType.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
similarity index 73%
rename from modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ConditionType.java
rename to modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
index 16383d2..08e5326 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ConditionType.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java
@@ -17,19 +17,16 @@
package org.apache.ignite.metastorage.common;
-/**
- * Defines available condition types.
- */
-enum ConditionType {
- /** Equal to smth. */
- EQUAL,
-
- /** Not equal to smth. */
- NOT_EQUAL,
+public final class Conditions {
+ public static Condition.RevisionCondition revision() {
+ return new Condition.RevisionCondition();
+ }
- /** Greater than smth. */
- GREATER,
+ public static Condition.ValueCondition value() {
+ return new Condition.ValueCondition();
+ }
- /** Less than smth. */
- LESS
+ private Conditions() {
+ // No-op.
+ }
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java
index 719809f..b01128e 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java
@@ -27,136 +27,26 @@ import org.jetbrains.annotations.Nullable;
* <ul>value - a data which is associated with a key and represented as an array of bytes.</ul>
* <ul>revision - a number which denotes a version of whole meta storage. Each change increments the revision.</ul>
* </ul>
- *
- * Instance of {@link #Entry} could represents:
- * <ul>
- * <li>A regular entry which stores a particular key, a value and a revision number.</li>
- * <li>An empty entry which denotes absence a regular entry in the meta storage for a given key.
- * A revision is 0 for such kind of entry.</li>
- * <li>A tombstone entry which denotes that a regular entry for a given key was removed from storage on some revision.</li>
- * </ul>
*/
-public class Entry {
- /** Entry key. Couldn't be {@code null}. */
- @NotNull
- final private byte[] key;
-
- /**
- * Entry value.
- * <p>
- * {@code val == null} only for {@link #empty()} and {@link #tombstone()} entries.
- * </p>
- */
- @Nullable
- final private byte[] val;
-
- /**
- * Revision number corresponding to this particular entry.
- * <p>
- * {@code rev == 0} for {@link #empty()} entry,
- * {@code rev > 0} for regular and {@link #tombstone()} entries.
- * </p>
- */
- final private long rev;
-
- /**
- * Constructor.
- *
- * @param key Key bytes. Couldn't be {@code null}.
- * @param val Value bytes. Couldn't be {@code null}.
- * @param rev Revision.
- */
- // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor.
- public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) {
- assert key != null : "key can't be null";
- assert val != null : "value can't be null";
-
- this.key = key;
- this.val = val;
- this.rev = rev;
- }
-
- /**
- * Constructor for empty and tombstone entries.
- *
- * @param key Key bytes. Couldn't be {@code null}.
- * @param rev Revision.
- */
- private Entry(@NotNull byte[] key, long rev) {
- assert key != null : "key can't be null";
-
- this.key = key;
- this.val = null;
- this.rev = rev;
- }
-
- /**
- * Creates an instance of empty entry for a given key.
- *
- * @param key Key bytes. Couldn't be {@code null}.
- * @return Empty entry.
- */
- @NotNull
- public static Entry empty(byte[] key) {
- return new Entry(key, 0);
- }
-
- /**
- * Creates an instance of tombstone entry for a given key and a revision.
- *
- * @param key Key bytes. Couldn't be {@code null}.
- * @return Empty entry.
- */
- @NotNull
- public static Entry tombstone(byte[] key, long rev) {
- assert rev > 0 : "rev must be positive for tombstone entry.";
-
- return new Entry(key, rev);
- }
-
+public interface Entry {
/**
* Returns a key.
*
- * @return Key.
+ * @return The key.
*/
- @NotNull
- public byte[] key() {
- return key;
- }
+ @NotNull byte[] key();
/**
- * Returns a value.
+ * Returns a value. Could be {@code null} for empty entry.
*
* @return Value.
*/
- @Nullable
- public byte[] value() {
- return val;
- }
+ @Nullable byte[] value();
/**
* Returns a revision.
- * @return Revision.
- */
- public long revision() {
- return rev;
- }
-
- /**
- * Returns value which denotes whether entry is tombstone or not.
*
- * @return {@code True} if entry is tombstone, otherwise - {@code false}.
- */
- public boolean tombstone() {
- return val == null && rev > 0;
- }
-
- /**
- * Returns value which denotes whether entry is empty or not.
- *
- * @return {@code True} if entry is empty, otherwise - {@code false}.
+ * @return Revision.
*/
- public boolean empty() {
- return val == null && rev == 0;
- }
+ long revision();
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ValueCondition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/OperationTimeoutException.java
similarity index 50%
rename from modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ValueCondition.java
rename to modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/OperationTimeoutException.java
index d437c87..4edcf02 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ValueCondition.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/OperationTimeoutException.java
@@ -18,32 +18,42 @@
package org.apache.ignite.metastorage.common;
/**
- * Condition, intended for value evaluation.
+ * Thrown when an operation is not executed within a specified time period. Usually in such cases the operation
+ * should be retried.
*/
-public class ValueCondition extends AbstractCondition {
- /** Value. */
- private final byte[] val;
+public class OperationTimeoutException extends RuntimeException {
+ /**
+ * Constructs an exception.
+ */
+ public OperationTimeoutException() {
+ super();
+ }
/**
- * Constructor.
+ * Constructs an exception with a given message.
*
- * @param type Condition type.
- * @param val Value for comparison.
+ * @param message Detail message.
*/
- public ValueCondition(ConditionType type, byte[] val) {
- super(type);
+ public OperationTimeoutException(String message) {
+ super(message);
+ }
- this.val = val;
+ /**
+ * Constructs an exception with a given message and a cause.
+ *
+ * @param message Detail message.
+ * @param cause Cause.
+ */
+ public OperationTimeoutException(String message, Throwable cause) {
+ super(message, cause);
}
/**
- * Compares a given value with a target one in lexicographical manner.
+ * Constructs an exception with a given cause.
*
- * @param e A target entry.
- * @return Comparison result as defined {@link java.util.Comparator} contract.
+ * @param cause Cause.
*/
- // TODO: Actually, value could be compared in different manners. So we should have possibility to define comparison logic.
- @Override protected int compare(Entry e) {
- return LexicographicComparator.INSTANCE.compare(e.value(), val);
+ public OperationTimeoutException(Throwable cause) {
+ super(cause);
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/PutUpdate.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/PutUpdate.java
deleted file mode 100644
index d1012df..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/PutUpdate.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.metastorage.common;
-
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Put (write) update.
- */
-public class PutUpdate implements Update {
- /** Key. */
- @NotNull
- private final byte[] key;
-
- /** Value. */
- @NotNull
- private final byte[] val;
-
- /**
- * Constructor.
- *
- * @param key A target key which will be updated.
- * @param val A target value which will be written for a given key.
- */
- public PutUpdate(@NotNull byte[] key, @NotNull byte[] val) {
- assert key != null : "key can't be null";
- assert val != null : "value can't be null";
-
- this.key = key;
- this.val = val;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RevisionCondition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RevisionCondition.java
deleted file mode 100644
index cd08c9c..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RevisionCondition.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.metastorage.common;
-
-/**
- * Condition, intended for revision evaluation.
- */
-public class RevisionCondition extends AbstractCondition {
- /** Revision. */
- private final long rev;
-
- /**
- * Constructor.
- *
- * @param type Condition type.
- * @param rev Revision for comparison.
- */
- public RevisionCondition(ConditionType type, long rev) {
- super(type);
-
- this.rev = rev;
- }
-
- /**
- * Compares a given revision with a target one.
- *
- * @param e A target entry.
- * @return Comparison result as defined {@link java.util.Comparator} contract.
- */
- @Override protected int compare(Entry e) {
- return Long.compare(e.revision(), rev);
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Update.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Update.java
index 58b211c..d5a6b46 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Update.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Update.java
@@ -18,7 +18,36 @@
package org.apache.ignite.metastorage.common;
/**
- * Defines update command for metastorage conditional update.
+ * Defines update command for meta storage conditional update.
*/
-public interface Update {
-}
+public final class Update {
+ private final InnerUpdate upd;
+
+ Update(InnerUpdate upd) {
+ this.upd = upd;
+ }
+
+ public static final class RemoveUpdate implements InnerUpdate {
+ RemoveUpdate() {
+ // No-op.
+ }
+ }
+
+ public static final class PutUpdate implements InnerUpdate {
+ private final byte[] val;
+
+ PutUpdate(byte[] val) {
+ this.val = val;
+ }
+ }
+
+ public static final class NoOpUpdate implements InnerUpdate {
+ NoOpUpdate() {
+ // No-op.
+ }
+ }
+
+ private interface InnerUpdate {
+ // Marker interface.
+ }
+}
\ No newline at end of file
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RemoveUpdate.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Updates.java
similarity index 70%
rename from modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RemoveUpdate.java
rename to modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Updates.java
index 09e227c..afa7cbe 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RemoveUpdate.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Updates.java
@@ -17,22 +17,20 @@
package org.apache.ignite.metastorage.common;
-import org.jetbrains.annotations.NotNull;
+public final class Updates {
+ public static Update remove() {
+ return new Update(new Update.RemoveUpdate());
+ }
-/**
- * Remove update.
- */
-public class RemoveUpdate implements Update {
- /** Key. */
- @NotNull
- private final byte[] key;
+ public static Update put(byte[] value) {
+ return new Update(new Update.PutUpdate(value));
+ }
+
+ public static Update noop() {
+ return new Update(new Update.NoOpUpdate());
+ }
- /**
- * Constructor.
- *
- * @param key A target key which will be removed.
- */
- public RemoveUpdate(@NotNull byte[] key) {
- this.key = key;
+ private Updates() {
+ // No-op.
}
}