You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/18 10:36:53 UTC
[ignite-3] 09/09: IGNITE-14389 MetaStorageService integration
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 51d3fb4f902bad06d23197a2cb78edacb91d4822
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Tue May 18 12:48:48 2021 +0300
IGNITE-14389 MetaStorageService integration
---
modules/metastorage-client/pom.xml | 6 +
.../client/ITMetaStorageServiceTest.java | 4 +-
.../internal/metastorage/client/CursorImpl.java | 14 +-
.../internal/metastorage/client/EntryImpl.java} | 78 +++++---
.../metastorage/client/MetaStorageServiceImpl.java | 197 ++++++++++++++-----
.../ignite/metastorage/client/Condition.java | 185 ++++++++----------
.../metastorage/client/MetaStorageService.java | 8 +-
.../ignite/metastorage/client/Operation.java | 71 ++++++-
.../internal/metastorage/common/ConditionType.java | 53 ++++++
.../RemoveCommand.java => OperationType.java} | 30 +--
.../{GetCommand.java => ConditionInfo.java} | 68 ++++---
.../{RemoveCommand.java => ErrorResponse.java} | 30 +--
.../metastorage/common/command/GetAllCommand.java | 25 ++-
.../common/command/GetAndPutAllCommand.java | 23 ++-
.../common/command/GetAndPutCommand.java | 6 +-
.../common/command/GetAndRemoveAllCommand.java | 18 +-
.../common/command/GetAndRemoveCommand.java | 6 +-
.../metastorage/common/command/GetCommand.java | 10 +-
.../metastorage/common/command/InvokeCommand.java | 76 ++++++++
...moveCommand.java => MultipleEntryResponse.java} | 29 +--
.../{PutCommand.java => OperationInfo.java} | 53 ++++--
.../metastorage/common/command/PutAllCommand.java | 44 ++++-
.../metastorage/common/command/PutCommand.java | 6 +-
.../metastorage/common/command/RangeCommand.java | 14 +-
.../common/command/RemoveAllCommand.java | 22 +--
.../metastorage/common/command/RemoveCommand.java | 6 +-
.../common/command/SingleEntryResponse.java | 92 +++++++++
.../common/command/WatchExactKeysCommand.java | 22 +--
.../common/command/WatchRangeKeysCommand.java | 15 +-
.../{EntryEvent.java => CompactedException.java} | 43 +++--
.../ignite/internal/metastorage/server/Entry.java | 1 -
.../internal/metastorage/server/EntryEvent.java | 2 +-
.../metastorage/server/KeyValueStorage.java | 187 +++++++++++++++---
.../internal/metastorage/server/Operation.java | 41 ++--
.../server/SimpleInMemoryKeyValueStorage.java | 2 -
.../metastorage/server/ValueCondition.java | 2 +-
.../server/raft/MetaStorageCommandListener.java | 211 +++++++++++++++------
.../server/SimpleInMemoryKeyValueStorageTest.java | 69 +++----
modules/metastorage/pom.xml | 4 +-
.../internal/metastorage/MetaStorageManager.java | 17 +-
.../internal/metastorage/WatchAggregatorTest.java | 15 +-
.../internal/table/distributed/TableManager.java | 2 +-
parent/pom.xml | 6 +
43 files changed, 1239 insertions(+), 574 deletions(-)
diff --git a/modules/metastorage-client/pom.xml b/modules/metastorage-client/pom.xml
index d45bafd..9cb157f 100644
--- a/modules/metastorage-client/pom.xml
+++ b/modules/metastorage-client/pom.xml
@@ -74,5 +74,11 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-metastorage-server</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 8601ffd..03536e2 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -68,9 +68,9 @@ import static org.mockito.Mockito.verify;
* Meta storage client tests.
*/
@SuppressWarnings("WeakerAccess")
-public class MetaStorageServiceTest {
+public class ITMetaStorageServiceTest {
/** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageServiceTest.class);
+ private static final IgniteLogger LOG = IgniteLogger.forClass(ITMetaStorageServiceTest.class);
/** Base network port. */
private static final int NODE_PORT_BASE = 20_000;
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
index dc76a7c..fb71816 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.metastorage.client;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
@@ -46,14 +47,17 @@ public class CursorImpl<T> implements Cursor<T> {
private final Iterator<T> it;
+ private final Function<Object, T> fn ;
+
/**
* @param metaStorageRaftGrpSvc Meta storage raft group service.
* @param initOp Future that runs meta storage service operation that provides cursor.
*/
- CursorImpl(RaftGroupService metaStorageRaftGrpSvc, CompletableFuture<IgniteUuid> initOp) {
+ CursorImpl(RaftGroupService metaStorageRaftGrpSvc, CompletableFuture<IgniteUuid> initOp, Function<Object, T> fn) {
this.metaStorageRaftGrpSvc = metaStorageRaftGrpSvc;
this.initOp = initOp;
- this.it = new InnerIterator();
+ this.it = new InnerIterator<>();
+ this.fn = fn;
}
/** {@inheritDoc} */
@@ -84,7 +88,7 @@ public class CursorImpl<T> implements Cursor<T> {
return it.next();
}
- private class InnerIterator implements Iterator<T> {
+ private class InnerIterator<T> implements Iterator<T> {
/** {@inheritDoc} */
@Override public boolean hasNext() {
try {
@@ -101,8 +105,8 @@ public class CursorImpl<T> implements Cursor<T> {
/** {@inheritDoc} */
@Override public T next() {
try {
- return initOp.thenCompose(
- cursorId -> metaStorageRaftGrpSvc.<T>run(new CursorNextCommand(cursorId))).get();
+ return (T)initOp.thenCompose(
+ cursorId -> metaStorageRaftGrpSvc.run(new CursorNextCommand(cursorId))).thenApply(fn).get();
}
catch (InterruptedException | ExecutionException e) {
LOG.error("Unable to evaluate cursor hasNext command", e);
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/DummyEntry.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
similarity index 58%
rename from modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/DummyEntry.java
rename to modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
index 4dd6bad..66a5328 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/DummyEntry.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
@@ -15,63 +15,79 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.common;
+package org.apache.ignite.internal.metastorage.client;
-import java.io.Serializable;
import java.util.Arrays;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.metastorage.client.Entry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
-// TODO: IGNITE-14389 Tmp, used instead of real Entry implementation. Should be removed.
/**
- * Dummy entry implementation.
+ * Meta storage entry.
*/
-public final class DummyEntry implements Serializable {
+public final class EntryImpl implements Entry {
/** Key. */
- @NotNull private ByteArray key;
+ private final ByteArray key;
/** Value. */
- @Nullable private byte[] val;
+ private final byte[] val;
/** Revision. */
- private long revision;
+ private final long rev;
/** Update counter. */
- private long updateCntr;
+ private final long updCntr;
/**
+ * Construct entry with given paramteters.
*
* @param key Key.
* @param val Value.
- * @param revision Revision.
- * @param updateCntr Update counter.
+ * @param rev Revision.
+ * @param updCntr Update counter.
*/
- public DummyEntry(@NotNull ByteArray key, @Nullable byte[] val, long revision, long updateCntr) {
+ EntryImpl(ByteArray key, byte[] val, long rev, long updCntr) {
this.key = key;
this.val = val;
- this.revision = revision;
- this.updateCntr = updateCntr;
+ this.rev = rev;
+ this.updCntr = updCntr;
}
- /** {@inheritDoc} */
- public @NotNull ByteArray key() {
+ /**
+ * Returns key.
+ *
+ * @return Key.
+ */
+ @Override public @NotNull ByteArray key() {
return key;
}
- /** {@inheritDoc} */
- public @Nullable byte[] value() {
+ /**
+ * Returns value.
+ *
+ * @return Value.
+ */
+ @Override public @Nullable byte[] value() {
return val;
}
- /** {@inheritDoc} */
- public long revision() {
- return revision;
+ /**
+ * Returns revision.
+ *
+ * @return Revision.
+ */
+ @Override public long revision() {
+ return rev;
}
- /** {@inheritDoc} */
- public long updateCounter() {
- return updateCntr;
+ /**
+ * Returns update counter.
+ *
+ * @return Update counter.
+ */
+ @Override public long updateCounter() {
+ return updCntr;
}
/** {@inheritDoc} */
@@ -82,12 +98,12 @@ public final class DummyEntry implements Serializable {
if (o == null || getClass() != o.getClass())
return false;
- DummyEntry entry = (DummyEntry)o;
+ EntryImpl entry = (EntryImpl)o;
- if (revision != entry.revision)
+ if (rev != entry.rev)
return false;
- if (updateCntr != entry.updateCntr)
+ if (updCntr != entry.updCntr)
return false;
if (!key.equals(entry.key))
@@ -99,9 +115,13 @@ public final class DummyEntry implements Serializable {
/** {@inheritDoc} */
@Override public int hashCode() {
int res = key.hashCode();
+
res = 31 * res + Arrays.hashCode(val);
- res = 31 * res + (int)(revision ^ (revision >>> 32));
- res = 31 * res + (int)(updateCntr ^ (updateCntr >>> 32));
+
+ res = 31 * res + (int)(rev ^ (rev >>> 32));
+
+ res = 31 * res + (int)(updCntr ^ (updCntr >>> 32));
+
return res;
}
}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
index fbd1c17..2764acc 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
@@ -19,24 +19,30 @@ package org.apache.ignite.internal.metastorage.client;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
+import org.apache.ignite.internal.metastorage.common.OperationType;
+import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
import org.apache.ignite.internal.metastorage.common.command.PutCommand;
import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
import org.apache.ignite.internal.util.Cursor;
@@ -44,12 +50,7 @@ import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.metastorage.client.MetaStorageService;
-import org.apache.ignite.metastorage.client.Condition;
-import org.apache.ignite.metastorage.client.Entry;
-import org.apache.ignite.metastorage.client.Operation;
-import org.apache.ignite.metastorage.client.WatchEvent;
-import org.apache.ignite.metastorage.client.WatchListener;
+import org.apache.ignite.metastorage.client.*;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -78,24 +79,29 @@ public class MetaStorageServiceImpl implements MetaStorageService {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) {
- return metaStorageRaftGrpSvc.run(new GetCommand(key));
+ return metaStorageRaftGrpSvc.run(new GetCommand(key)).thenApply(obj -> {
+ SingleEntryResponse resp = (SingleEntryResponse)obj;
+
+ return new EntryImpl(new ByteArray(resp.key()), resp.value(), resp.revision(), resp.updateCounter());
+ });
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key, long revUpperBound) {
- return metaStorageRaftGrpSvc.run(new GetCommand(key, revUpperBound));
+ return metaStorageRaftGrpSvc.run(new GetCommand(key, revUpperBound))
+ .thenApply(MetaStorageServiceImpl::singleEntryResult);
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Collection<ByteArray> keys) {
- return metaStorageRaftGrpSvc.<Collection<Entry>>run(new GetAllCommand(keys)).
- thenApply(entries -> entries.stream().collect(Collectors.toMap(Entry::key, Function.identity())));
+ return metaStorageRaftGrpSvc.run(new GetAllCommand(keys))
+ .thenApply(MetaStorageServiceImpl::multipleEntryResult);
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Collection<ByteArray> keys, long revUpperBound) {
- return metaStorageRaftGrpSvc.<Collection<Entry>>run(new GetAllCommand(keys, revUpperBound)).
- thenApply(entries -> entries.stream().collect(Collectors.toMap(Entry::key, Function.identity())));
+ return metaStorageRaftGrpSvc.run(new GetAllCommand(keys, revUpperBound)).
+ thenApply(MetaStorageServiceImpl::multipleEntryResult);
}
/** {@inheritDoc} */
@@ -105,7 +111,8 @@ public class MetaStorageServiceImpl implements MetaStorageService {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull ByteArray key, @NotNull byte[] value) {
- return metaStorageRaftGrpSvc.run(new GetAndPutCommand(key, value));
+ return metaStorageRaftGrpSvc.run(new GetAndPutCommand(key, value))
+ .thenApply(MetaStorageServiceImpl::singleEntryResult);
}
/** {@inheritDoc} */
@@ -115,16 +122,8 @@ public class MetaStorageServiceImpl implements MetaStorageService {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndPutAll(@NotNull Map<ByteArray, byte[]> vals) {
- List<ByteArray> keys = new ArrayList<>();
- List<byte[]> values = new ArrayList<>();
-
- vals.forEach((key, value) -> {
- keys.add(key);
- values.add(value);
- });
-
- return metaStorageRaftGrpSvc.<Collection<Entry>>run(new GetAndPutAllCommand(keys, values)).
- thenApply(entries -> entries.stream().collect(Collectors.toMap(Entry::key, Function.identity())));
+ return metaStorageRaftGrpSvc.run(new GetAndPutAllCommand(vals)).
+ thenApply(MetaStorageServiceImpl::multipleEntryResult);
}
/** {@inheritDoc} */
@@ -134,46 +133,59 @@ public class MetaStorageServiceImpl implements MetaStorageService {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull ByteArray key) {
- return metaStorageRaftGrpSvc.run(new GetAndRemoveCommand(key));
+ return metaStorageRaftGrpSvc.run(new GetAndRemoveCommand(key))
+ .thenApply(MetaStorageServiceImpl::singleEntryResult);
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<ByteArray> keys) {
+ @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) {
return metaStorageRaftGrpSvc.run(new RemoveAllCommand(keys));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Collection<ByteArray> keys) {
- return metaStorageRaftGrpSvc.<Collection<Entry>>run(new GetAndRemoveAllCommand(keys)).
- thenApply(entries -> entries.stream().collect(Collectors.toMap(Entry::key, Function.identity())));
+ @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) {
+ return metaStorageRaftGrpSvc.run(new GetAndRemoveAllCommand(keys)).
+ thenApply(MetaStorageServiceImpl::multipleEntryResult);
}
- //TODO: implement
- @Override
- public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition, @NotNull Operation success, @NotNull Operation failure) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> invoke(
+ @NotNull Condition condition,
+ @NotNull Operation success,
+ @NotNull Operation failure
+ ) {
+ return invoke(condition, List.of(success), List.of(failure));
}
- // TODO: IGNITE-14389 Implement.
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition,
- @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> invoke(
+ @NotNull Condition condition,
+ @NotNull Collection<Operation> success,
+ @NotNull Collection<Operation> failure
+ ) {
+ ConditionInfo cond = toConditionInfo(condition);
+
+ List<OperationInfo> successOps = toOperationInfos(success);
+
+ List<OperationInfo> failureOps = toOperationInfos(failure);
+
+ return metaStorageRaftGrpSvc.run(new InvokeCommand(cond, successOps, failureOps));
}
/** {@inheritDoc} */
@Override public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo, long revUpperBound) {
return new CursorImpl<>(
- metaStorageRaftGrpSvc,
- metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo, revUpperBound))
+ metaStorageRaftGrpSvc,
+ metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo, revUpperBound)),
+ MetaStorageServiceImpl::singleEntryResult
);
}
/** {@inheritDoc} */
@Override public @NotNull Cursor<Entry> range(@NotNull ByteArray keyFrom, @Nullable ByteArray keyTo) {
return new CursorImpl<>(
- metaStorageRaftGrpSvc,
- metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo))
+ metaStorageRaftGrpSvc,
+ metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo)),
+ MetaStorageServiceImpl::singleEntryResult
);
}
@@ -190,7 +202,7 @@ public class MetaStorageServiceImpl implements MetaStorageService {
watchRes.thenAccept(
watchId -> watchProcessor.addWatch(
watchId,
- new CursorImpl<>(metaStorageRaftGrpSvc, watchRes),
+ new CursorImpl<>(metaStorageRaftGrpSvc, watchRes, MetaStorageServiceImpl::watchResponse),
lsnr
)
);
@@ -219,7 +231,7 @@ public class MetaStorageServiceImpl implements MetaStorageService {
watchRes.thenAccept(
watchId -> watchProcessor.addWatch(
watchId,
- new CursorImpl<>(metaStorageRaftGrpSvc, watchRes),
+ new CursorImpl<>(metaStorageRaftGrpSvc, watchRes, MetaStorageServiceImpl::watchResponse),
lsnr
)
);
@@ -232,12 +244,105 @@ public class MetaStorageServiceImpl implements MetaStorageService {
return CompletableFuture.runAsync(() -> watchProcessor.stopWatch(id));
}
- // TODO: IGNITE-14389 Implement.
+ // TODO: IGNITE-14734 Implement.
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> compact() {
return null;
}
+ /** */
+ private static List<OperationInfo> toOperationInfos(Collection<Operation> ops) {
+ List<OperationInfo> res = new ArrayList<>(ops.size());
+
+ for (Operation op : ops) {
+ OperationInfo info = null;
+
+ if (op.type() == OperationType.NO_OP)
+ info = new OperationInfo(null, null, OperationType.NO_OP);
+ else if (op.type() == OperationType.REMOVE)
+ info = new OperationInfo(((Operation.RemoveOp)op.inner()).key(), null, OperationType.REMOVE);
+ else if (op.type() == OperationType.PUT) {
+ Operation.PutOp inner = (Operation.PutOp)op.inner();
+
+ info = new OperationInfo(inner.key(), inner.value(), OperationType.PUT);
+ }
+
+ res.add(info);
+ }
+
+ return res;
+ }
+
+ /** */
+ private static ConditionInfo toConditionInfo(@NotNull Condition condition) {
+ ConditionInfo cnd = null;
+
+ Object obj = condition.inner();
+
+ if (obj instanceof Condition.ExistenceCondition) {
+ Condition.ExistenceCondition inner = (Condition.ExistenceCondition)obj;
+
+ cnd = new ConditionInfo(inner.key(), inner.type(), null, 0);
+ }
+ else if (obj instanceof Condition.RevisionCondition) {
+ Condition.RevisionCondition inner = (Condition.RevisionCondition)obj;
+
+ cnd = new ConditionInfo(inner.key(), inner.type(), null, inner.revision());
+ }
+ else if (obj instanceof Condition.ValueCondition) {
+ Condition.ValueCondition inner = (Condition.ValueCondition)obj;
+
+ cnd = new ConditionInfo(inner.key(), inner.type(), inner.value(), 0);
+ }
+
+ return cnd;
+ }
+
+ private static Map<ByteArray, Entry> multipleEntryResult(Object obj) {
+ MultipleEntryResponse resp = (MultipleEntryResponse) obj;
+
+ Map<ByteArray, Entry> res = new HashMap<>();
+
+ for (SingleEntryResponse e : resp.entries()) {
+ ByteArray key = new ByteArray(e.key());
+
+ res.put(key, new EntryImpl(key, e.value(), e.revision(), e.updateCounter()));
+ }
+
+ return res;
+ }
+
+ private static Entry singleEntryResult(Object obj) {
+ SingleEntryResponse resp = (SingleEntryResponse) obj;
+
+ return new EntryImpl(new ByteArray(resp.key()), resp.value(), resp.revision(), resp.updateCounter());
+ }
+
+ private static WatchEvent watchResponse(Object obj) {
+ MultipleEntryResponse resp = (MultipleEntryResponse) obj;
+
+ List<EntryEvent> evts = new ArrayList<>(resp.entries().size() / 2);
+
+ Entry o = null;
+ Entry n = null;
+
+ for (int i = 0; i < resp.entries().size(); i++) {
+ SingleEntryResponse s = resp.entries().get(i);
+
+ EntryImpl e = new EntryImpl(new ByteArray(s.key()), s.value(), s.revision(), s.updateCounter());
+
+ if (i % 2 == 0)
+ o = e;
+ else {
+ n = e;
+
+ evts.add(new EntryEvent(o, n));
+ }
+ }
+
+ return new WatchEvent(evts);
+ }
+
// TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
/** Watch processor, that manages {@link Watcher} threads. */
private final class WatchProcessor {
@@ -308,8 +413,6 @@ public class MetaStorageServiceImpl implements MetaStorageService {
* in the while(true) loop. Collects watch events with same revision and fires either onUpdate or onError().
*/
@Override public void run() {
- long rev = -1;
-
Iterator<WatchEvent> watchEvtsIter = cursor.iterator();
while (true) {
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Condition.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Condition.java
index e060b8d..42f87cb 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Condition.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Condition.java
@@ -18,6 +18,7 @@
package org.apache.ignite.metastorage.client;
import java.util.Collection;
+import org.apache.ignite.internal.metastorage.common.ConditionType;
/**
* Represents a condition for meta storage conditional update.
@@ -38,19 +39,20 @@ public final class Condition {
this.cond = cond;
}
+ public InnerCondition inner() {
+ return cond;
+ }
+
+ public ConditionType type() {
+ return cond.type();
+ }
+
/**
* Represents condition on entry revision. Only one type of condition could be applied to
* the one instance of condition. Subsequent invocations of any method which produces condition will throw
* {@link IllegalStateException}.
*/
public static final class RevisionCondition extends AbstractCondition {
- /**
- * The type of condition.
- *
- * @see Type
- */
- private Type type;
-
/** The revision as the condition argument. */
private long rev;
@@ -63,130 +65,116 @@ public final class Condition {
super(key);
}
+ public long revision() {
+ return rev;
+ }
+
/**
- * Produces the condition of type {@link Type#EQUAL}. This condition tests the given revision on equality with
+ * Produces the condition of type {@link ConditionType#REV_EQUAL}. This condition tests the given revision on equality with
* target entry revision.
*
* @param rev The revision.
- * @return The condition of type {@link Type#EQUAL}.
+ * @return The condition of type {@link ConditionType#REV_EQUAL}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition eq(long rev) {
- validate(type);
+ validate(type());
+
+ type(ConditionType.REV_EQUAL);
- this.type = Type.EQUAL;
this.rev = rev;
return new Condition(this);
}
/**
- * Produces the condition of type {@link Type#NOT_EQUAL}. This condition tests the given revision on inequality
+ * Produces the condition of type {@link ConditionType#REV_NOT_EQUAL}. This condition tests the given revision on inequality
* with target entry revision.
*
* @param rev The revision.
- * @return The condition of type {@link Type#NOT_EQUAL}.
+ * @return The condition of type {@link ConditionType#REV_NOT_EQUAL}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition ne(long rev) {
- validate(type);
+ validate(type());
+
+ type(ConditionType.REV_NOT_EQUAL);
- this.type = Type.NOT_EQUAL;
this.rev = rev;
return new Condition(this);
}
/**
- * Produces the condition of type {@link Type#GREATER}. This condition tests that the target entry revision
+ * Produces the condition of type {@link ConditionType#REV_GREATER}. This condition tests that the target entry revision
* is greater than given revision.
*
* @param rev The revision.
- * @return The condition of type {@link Type#GREATER}.
+ * @return The condition of type {@link ConditionType#REV_GREATER}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition gt(long rev) {
- validate(type);
+ validate(type());
+
+ type(ConditionType.REV_GREATER);
- this.type = Type.GREATER;
this.rev = rev;
return new Condition(this);
}
/**
- * Produces the condition of type {@link Type#GREATER_OR_EQUAL}. This condition tests that the target entry
+ * Produces the condition of type {@link ConditionType#REV_GREATER_OR_EQUAL}. This condition tests that the target entry
* revision is greater than or equal to given revision.
*
* @param rev The revision.
- * @return The condition of type {@link Type#GREATER_OR_EQUAL}.
+ * @return The condition of type {@link ConditionType#REV_GREATER_OR_EQUAL}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition ge(long rev) {
- validate(type);
+ validate(type());
+
+ type(ConditionType.REV_GREATER_OR_EQUAL);
- this.type = Type.GREATER_OR_EQUAL;
this.rev = rev;
return new Condition(this);
}
/**
- * Produces the condition of type {@link Type#LESS}. This condition tests that target entry revision
+ * Produces the condition of type {@link ConditionType#REV_LESS}. This condition tests that target entry revision
* is less than the given revision.
*
* @param rev The revision.
- * @return The condition of type {@link Type#LESS}.
+ * @return The condition of type {@link ConditionType#REV_LESS}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition lt(long rev) {
- validate(type);
+ validate(type());
- this.type = Type.LESS;
+ type(ConditionType.REV_LESS);
this.rev = rev;
return new Condition(this);
}
/**
- * Produces the condition of type {@link Type#LESS_OR_EQUAL}. This condition tests that target entry revision
+ * Produces the condition of type {@link ConditionType#REV_LESS_OR_EQUAL}. This condition tests that target entry revision
* is less than or equal to the given revision.
*
* @param rev The revision.
- * @return The condition of type {@link Type#LESS_OR_EQUAL}.
+ * @return The condition of type {@link ConditionType#REV_LESS_OR_EQUAL}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition le(long rev) {
- validate(type);
+ validate(type());
+
+ type(ConditionType.REV_LESS_OR_EQUAL);
- this.type = Type.LESS_OR_EQUAL;
this.rev = rev;
return new Condition(this);
}
-
- /**
- * Defines possible condition types which can be applied to the revision.
- */
- enum Type {
- /** Equality condition type. */
- EQUAL,
-
- /** Inequality condition type. */
- NOT_EQUAL,
-
- /** Greater than condition type. */
- GREATER,
-
- /** Less than condition type. */
- LESS,
-
- /** Less than or equal to condition type. */
- LESS_OR_EQUAL,
-
- /** Greater than or equal to condition type. */
- GREATER_OR_EQUAL
- }
}
/**
@@ -195,13 +183,6 @@ public final class Condition {
* {@link IllegalStateException}.
*/
public static final class ValueCondition extends AbstractCondition {
- /**
- * The type of condition.
- *
- * @see Type
- */
- private Type type;
-
/** The value as the condition argument. */
private byte[] val;
@@ -214,50 +195,45 @@ public final class Condition {
super(key);
}
+ public byte[] value() {
+ return val;
+ }
+
/**
- * Produces the condition of type {@link Type#EQUAL}. This condition tests the given value on equality with
+ * Produces the condition of type {@link ConditionType#VAL_EQUAL}. This condition tests the given value on equality with
* target entry value.
*
* @param val The value.
- * @return The condition of type {@link Type#EQUAL}.
+ * @return The condition of type {@link ConditionType#VAL_EQUAL}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition eq(byte[] val) {
- validate(type);
+ validate(type());
+
+ type(ConditionType.VAL_EQUAL);
- this.type = Type.EQUAL;
this.val = val;
return new Condition(this);
}
/**
- * Produces the condition of type {@link Type#NOT_EQUAL}. This condition tests the given value on inequality
+ * Produces the condition of type {@link ConditionType#VAL_NOT_EQUAL}. This condition tests the given value on inequality
* with target entry value.
*
* @param val The value.
- * @return The condition of type {@link Type#NOT_EQUAL}.
+ * @return The condition of type {@link ConditionType#VAL_NOT_EQUAL}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition ne(byte[] val) {
- validate(type);
+ validate(type());
+
+ type(ConditionType.VAL_NOT_EQUAL);
- this.type = Type.NOT_EQUAL;
this.val = val;
return new Condition(this);
}
-
- /**
- * Defines possible condition types which can be applied to the value.
- */
- enum Type {
- /** Equality condition type. */
- EQUAL,
-
- /** Inequality condition type. */
- NOT_EQUAL
- }
}
/**
@@ -267,13 +243,6 @@ public final class Condition {
*/
public static final class ExistenceCondition extends AbstractCondition {
/**
- * The type of condition.
- *
- * @see Type
- */
- private Type type;
-
- /**
* Constructs a condition on existence an entry identified by the given key.
*
* @param key Identifies an entry which condition will be applied to.
@@ -283,45 +252,34 @@ public final class Condition {
}
/**
- * Produces the condition of type {@link Type#EXISTS}. This condition tests the existence of an entry
+ * Produces the condition of type {@link ConditionType#KEY_EXISTS}. This condition tests the existence of an entry
* identified by the given key.
*
- * @return The condition of type {@link Type#EXISTS}.
+ * @return The condition of type {@link ConditionType#KEY_EXISTS}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition exists() {
- validate(type);
+ validate(type());
- this.type = Type.EXISTS;
+ type(ConditionType.KEY_EXISTS);
return new Condition(this);
}
/**
- * Produces the condition of type {@link Type#NOT_EXISTS}. This condition tests the non-existence of an entry
+ * Produces the condition of type {@link ConditionType#KEY_NOT_EXISTS}. This condition tests the non-existence of an entry
* identified by the given key.
*
- * @return The condition of type {@link Type#NOT_EXISTS}.
+ * @return The condition of type {@link ConditionType#KEY_NOT_EXISTS}.
* @throws IllegalStateException In case when the condition is already defined.
*/
public Condition notExists() {
- validate(type);
+ validate(type());
- this.type = Type.NOT_EXISTS;
+ type(ConditionType.KEY_NOT_EXISTS);
return new Condition(this);
}
-
- /**
- * Defines possible condition types which can be applied to the value.
- */
- enum Type {
- /** Existence condition type. */
- EXISTS,
-
- /** Non-existence condition type. */
- NOT_EXISTS
- }
}
/**
@@ -344,6 +302,8 @@ public final class Condition {
* @return Key which identifies an entry which condition will be applied to.
*/
byte[] key();
+
+ ConditionType type();
}
/**
@@ -354,6 +314,11 @@ public final class Condition {
private final byte[] key;
/**
+ * Condition type.
+ */
+ private ConditionType type;
+
+ /**
* Constructs a condition with the given entry key.
*
* @param key Key which identifies an entry which condition will be applied to.
@@ -370,5 +335,13 @@ public final class Condition {
@Override public byte[] key() {
return key;
}
+
+ @Override public ConditionType type() {
+ return type;
+ }
+
+ protected void type(ConditionType type) {
+ this.type = type;
+ }
}
}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
index 09757c3..35a7af8 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
@@ -19,6 +19,7 @@ package org.apache.ignite.metastorage.client;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteUuid;
@@ -167,14 +168,14 @@ public interface MetaStorageService {
/**
* Removes entries for given keys.
*
- * @param keys The keys collection. Couldn't be {@code null}.
+ * @param keys The keys set. Couldn't be {@code null} or empty.
* @return Completed future.
* @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
* @see ByteArray
* @see Entry
*/
@NotNull
- CompletableFuture<Void> removeAll(@NotNull Collection<ByteArray> keys);
+ CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys);
/**
* Removes entries for given keys and retrieves previous entries.
@@ -188,8 +189,7 @@ public interface MetaStorageService {
* @see Entry
*/
@NotNull
- CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Collection<ByteArray> keys);
-
+ CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys);
/**
* Updates an entry for the given key conditionally.
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Operation.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Operation.java
index bd41824..d81d488 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Operation.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Operation.java
@@ -17,6 +17,8 @@
package org.apache.ignite.metastorage.client;
+import org.apache.ignite.internal.metastorage.common.OperationType;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -36,6 +38,20 @@ public final class Operation {
this.upd = upd;
}
+ /** */
+ public InnerOp inner() {
+ return upd;
+ }
+
+ /**
+ * Returns operation type.
+ *
+ * @return Operation type.
+ */
+ public OperationType type() {
+ return upd.type();
+ }
+
/**
* Represents operation of type <i>remove</i>.
*/
@@ -46,7 +62,7 @@ public final class Operation {
* @param key Identifies an entry which operation will be applied to.
*/
RemoveOp(byte[] key) {
- super(key);
+ super(key, OperationType.REMOVE);
}
}
@@ -64,10 +80,19 @@ public final class Operation {
* @param val The value to which the entry should be updated.
*/
PutOp(byte[] key, byte[] val) {
- super(key);
+ super(key, OperationType.PUT);
this.val = val;
}
+
+ /**
+ * Returns value.
+ *
+ * @return Value.
+ */
+ public byte[] value() {
+ return val;
+ }
}
/**
@@ -78,7 +103,7 @@ public final class Operation {
* Default no-op constructor.
*/
NoOp() {
- super(null);
+ super(null, OperationType.NO_OP);
}
}
@@ -86,19 +111,57 @@ public final class Operation {
* Defines operation interface.
*/
private interface InnerOp {
+ /**
+ * Returns key.
+ *
+ * @return Key.
+ */
@Nullable byte[] key();
+
+ /**
+ * Returns operation type.
+ *
+ * @return Operation type.
+ */
+ @NotNull OperationType type();
}
+ /** */
private static class AbstractOp implements InnerOp {
+ /** Key. */
@Nullable private final byte[] key;
- public AbstractOp(@Nullable byte[] key) {
+ /** Operation type. */
+ @NotNull private final OperationType type;
+
+ /**
+ * Ctor.
+ * @param key Key.
+ * @param type Operation type.
+ */
+ public AbstractOp(@Nullable byte[] key, OperationType type) {
this.key = key;
+ this.type = type;
}
+ /**
+ * Returns key.
+ *
+ * @return Key.
+ */
@Nullable
@Override public byte[] key() {
return key;
}
+
+ /**
+ * Returns operation type.
+ *
+ * @return Operation type.
+ */
+ @NotNull
+ @Override public OperationType type() {
+ return type;
+ }
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/ConditionType.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/ConditionType.java
new file mode 100644
index 0000000..522bd8f
--- /dev/null
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/ConditionType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.common;
+
+/**
+ * Defines possible condition types which can be applied to the revision.
+ */
+public enum ConditionType {
+ /** Equality condition type for revision. */
+ REV_EQUAL,
+
+ /** Inequality condition type for revision. */
+ REV_NOT_EQUAL,
+
+ /** Greater than condition type for revision. */
+ REV_GREATER,
+
+ /** Less than condition type for revision. */
+ REV_LESS,
+
+ /** Less than or equal to condition type for revision. */
+ REV_LESS_OR_EQUAL,
+
+ /** Greater than or equal to condition type for revision. */
+ REV_GREATER_OR_EQUAL,
+
+ /** Equality condition type for value. */
+ VAL_EQUAL,
+
+ /** Inequality condition type for value. */
+ VAL_NOT_EQUAL,
+
+ /** Existence condition type for key. */
+ KEY_EXISTS,
+
+ /** Non-existence condition type for key. */
+ KEY_NOT_EXISTS
+}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/OperationType.java
similarity index 53%
copy from modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
copy to modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/OperationType.java
index 18322b9..7c97859 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/OperationType.java
@@ -15,30 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.metastorage.common.command;
-
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
+package org.apache.ignite.internal.metastorage.common;
/**
- * Remove command for MetaStorageCommandListener that removes an entry for the given key.
+ * Defines possible operation types.
*/
-public final class RemoveCommand implements WriteCommand {
- /** The key. Couldn't be {@code null}. */
- @NotNull private final ByteArray key;
+public enum OperationType {
+ /** No-op operation. */
+ NO_OP,
- /**
- * @param key he key. Couldn't be {@code null}.
- */
- public RemoveCommand(@NotNull ByteArray key) {
- this.key = key;
- }
+ /** Put (insert/replace) operation. */
+ PUT,
- /**
- * @return The key. Couldn't be {@code null}.
- */
- public @NotNull ByteArray key() {
- return key;
- }
+ /** Remove operation. */
+ REMOVE
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/ConditionInfo.java
similarity index 50%
copy from modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
copy to modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/ConditionInfo.java
index c71479b..998be42 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/ConditionInfo.java
@@ -17,51 +17,73 @@
package org.apache.ignite.internal.metastorage.common.command;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.jetbrains.annotations.NotNull;
+import java.io.Serializable;
+import org.apache.ignite.internal.metastorage.common.ConditionType;
/**
- * Get command for MetaStorageCommandListener that retrieves an entry
- * for the given key and the revision upper bound, if latter is present.
+ * Defines condition for InvokeCommand.
*/
-public final class GetCommand implements ReadCommand {
+public class ConditionInfo implements Serializable {
/** Key. */
- @NotNull private final ByteArray key;
+ private final byte[] key;
- /** The upper bound for entry revisions. Must be positive. */
- private long revUpperBound;
+ /** Type. */
+ private final ConditionType type;
+
+ /** Value. */
+ private final byte[] val;
+
+ /** Revision. */
+ private final long rev;
/**
- * @param key Key. Couldn't be {@code null}.
+ * Construct condition with given parameters.
+ *
+ * @param key Key.
+ * @param type Condition type.
+ * @param val Value.
+ * @param rev Revision.
*/
- public GetCommand(@NotNull ByteArray key) {
+ public ConditionInfo(byte[] key, ConditionType type, byte[] val, long rev) {
this.key = key;
+ this.type = type;
+ this.val = val;
+ this.rev = rev;
}
/**
- * @param key Key. Couldn't be {@code null}.
- * @param revUpperBound The upper bound for entry revisions. Must be positive.
+ * Returns key.
+ *
+ * @return Key.
*/
- public GetCommand(@NotNull ByteArray key, @NotNull long revUpperBound) {
- this.key = key;
-
- assert revUpperBound > 0;
+ public byte[] key() {
+ return key;
+ }
- this.revUpperBound = revUpperBound;
+ /**
+ * Returns condition type.
+ *
+ * @return Condition type.
+ */
+ public ConditionType type() {
+ return type;
}
/**
- * @return Key.
+ * Returns value.
+ *
+ * @return Value.
*/
- public @NotNull ByteArray key() {
- return key;
+ public byte[] value() {
+ return val;
}
/**
- * @return The upper bound for entry revisions, or {@code null} if wasn't specified.
+ * Returns revision.
+ *
+ * @return Revision.
*/
public long revision() {
- return revUpperBound;
+ return rev;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/ErrorResponse.java
similarity index 59%
copy from modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
copy to modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/ErrorResponse.java
index 18322b9..8ef1744 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/ErrorResponse.java
@@ -17,28 +17,28 @@
package org.apache.ignite.internal.metastorage.common.command;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
+import java.io.Serializable;
-/**
- * Remove command for MetaStorageCommandListener that removes an entry for the given key.
- */
-public final class RemoveCommand implements WriteCommand {
- /** The key. Couldn't be {@code null}. */
- @NotNull private final ByteArray key;
+/** Defines error response. */
+public class ErrorResponse implements Serializable {
+ /** Error code. */
+ private final int errCode;
/**
- * @param key he key. Couldn't be {@code null}.
+ * Constructs error response.
+ *
+ * @param errCode Error code
*/
- public RemoveCommand(@NotNull ByteArray key) {
- this.key = key;
+ public ErrorResponse(int errCode) {
+ this.errCode = errCode;
}
/**
- * @return The key. Couldn't be {@code null}.
+ * Returns error code.
+ *
+ * @return Error code.
*/
- public @NotNull ByteArray key() {
- return key;
+ public int errorCode() {
+ return errCode;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
index c5fcaa9..e20a793 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
@@ -17,24 +17,23 @@
package org.apache.ignite.internal.metastorage.common.command;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.ReadCommand;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
/**
* Get all command for MetaStorageCommandListener that retrieves entries
* for given keys and the revision upper bound, if latter is present.
*/
public final class GetAllCommand implements ReadCommand {
- /** The collection of keys. */
- @NotNull private final Collection<ByteArray> keys;
+ /** The list of keys. */
+ @NotNull private final List<byte[]> keys;
/** The upper bound for entry revisions. Must be positive. */
- @Nullable private Long revUpperBound;
+ private long revUpperBound;
/**
* @param keys The collection of keys. Couldn't be {@code null} or empty. Collection elements couldn't be {@code
@@ -43,10 +42,10 @@ public final class GetAllCommand implements ReadCommand {
public GetAllCommand(@NotNull Collection<ByteArray> keys) {
assert !keys.isEmpty();
- if (keys instanceof Serializable)
- this.keys = keys;
- else
- this.keys = new ArrayList<>(keys);
+ this.keys = new ArrayList<>(keys.size());
+
+ for (ByteArray key : keys)
+ this.keys.add(key.bytes());
}
/**
@@ -54,7 +53,7 @@ public final class GetAllCommand implements ReadCommand {
* null}.
* @param revUpperBound The upper bound for entry revisions. Must be positive.
*/
- public GetAllCommand(@NotNull Collection<ByteArray> keys, @NotNull Long revUpperBound) {
+ public GetAllCommand(@NotNull Collection<ByteArray> keys, long revUpperBound) {
this(keys);
assert revUpperBound > 0;
@@ -63,16 +62,16 @@ public final class GetAllCommand implements ReadCommand {
}
/**
- * @return The collection of keys.
+ * @return The list of keys.
*/
- public @NotNull Collection<ByteArray> keys() {
+ public @NotNull List<byte[]> keys() {
return keys;
}
/**
* @return The upper bound for entry revisions. Must be positive.
*/
- public @Nullable Long revision() {
+ public long revision() {
return revUpperBound;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
index d586624..cf11013 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
@@ -17,8 +17,9 @@
package org.apache.ignite.internal.metastorage.common.command;
-import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -29,27 +30,31 @@ import org.jetbrains.annotations.NotNull;
*/
public final class GetAndPutAllCommand implements WriteCommand {
/** Keys. */
- @NotNull private final List<ByteArray> keys;
+ @NotNull private final List<byte[]> keys;
/** Values. */
@NotNull private final List<byte[]> vals;
/**
- * @param keys Keys.
* @param vals Values.
*/
- public GetAndPutAllCommand(@NotNull List<ByteArray> keys, @NotNull List<byte[]> vals) {
- assert keys instanceof Serializable;
- assert vals instanceof Serializable;
+ public GetAndPutAllCommand(@NotNull Map<ByteArray, byte[]> vals) {
+ int size = vals.size();
- this.keys = keys;
- this.vals = vals;
+ this.keys = new ArrayList<>(size);
+ this.vals = new ArrayList<>(size);
+
+ for (Map.Entry<ByteArray, byte[]> e : vals.entrySet()) {
+ this.keys.add(e.getKey().bytes());
+
+ this.vals.add(e.getValue());
+ }
}
/**
* @return Keys.
*/
- public @NotNull List<ByteArray> keys() {
+ public @NotNull List<byte[]> keys() {
return keys;
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
index 9bcb8aa..26bfbe5 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.NotNull;
*/
public final class GetAndPutCommand implements WriteCommand {
/** The key. Couldn't be {@code null}. */
- @NotNull private final ByteArray key;
+ @NotNull private final byte[] key;
/** The value. Couldn't be {@code null}. */
@NotNull private final byte[] val;
@@ -37,14 +37,14 @@ public final class GetAndPutCommand implements WriteCommand {
* @param val The value. Couldn't be {@code null}.
*/
public GetAndPutCommand(@NotNull ByteArray key, @NotNull byte[] val) {
- this.key = key;
+ this.key = key.bytes();
this.val = val;
}
/**
* @return The key. Couldn't be {@code null}.
*/
- public @NotNull ByteArray key() {
+ public @NotNull byte[] key() {
return key;
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
index 5712f0b..0ff8738 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.metastorage.common.command;
-import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.List;
+import java.util.Set;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -30,24 +30,22 @@ import org.jetbrains.annotations.NotNull;
*/
public final class GetAndRemoveAllCommand implements WriteCommand {
/** The keys collection. Couldn't be {@code null}. */
- @NotNull private final Collection<ByteArray> keys;
+ @NotNull private final List<byte[]> keys;
/**
* @param keys The keys collection. Couldn't be {@code null}.
*/
- public GetAndRemoveAllCommand(@NotNull Collection<ByteArray> keys) {
- assert !keys.isEmpty();
+ public GetAndRemoveAllCommand(@NotNull Set<ByteArray> keys) {
+ this.keys = new ArrayList<>(keys.size());
- if (keys instanceof Serializable)
- this.keys = keys;
- else
- this.keys = new ArrayList<>(keys);
+ for (ByteArray key : keys)
+ this.keys.add(key.bytes());
}
/**
* @return The keys collection. Couldn't be {@code null}.
*/
- public @NotNull Collection<ByteArray> keys() {
+ public @NotNull List<byte[]> keys() {
return keys;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
index b4a4166..57288ad 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
@@ -27,19 +27,19 @@ import org.jetbrains.annotations.NotNull;
*/
public final class GetAndRemoveCommand implements WriteCommand {
/** The key. Couldn't be {@code null}. */
- @NotNull private final ByteArray key;
+ @NotNull private final byte[] key;
/**
* @param key The key. Couldn't be {@code null}.
*/
public GetAndRemoveCommand(@NotNull ByteArray key) {
- this.key = key;
+ this.key = key.bytes();
}
/**
* @return The key. Couldn't be {@code null}.
*/
- public @NotNull ByteArray key() {
+ public @NotNull byte[] key() {
return key;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
index c71479b..381a8aa 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.NotNull;
*/
public final class GetCommand implements ReadCommand {
/** Key. */
- @NotNull private final ByteArray key;
+ @NotNull private final byte[] key;
/** The upper bound for entry revisions. Must be positive. */
private long revUpperBound;
@@ -36,15 +36,15 @@ public final class GetCommand implements ReadCommand {
* @param key Key. Couldn't be {@code null}.
*/
public GetCommand(@NotNull ByteArray key) {
- this.key = key;
+ this.key = key.bytes();
}
/**
* @param key Key. Couldn't be {@code null}.
* @param revUpperBound The upper bound for entry revisions. Must be positive.
*/
- public GetCommand(@NotNull ByteArray key, @NotNull long revUpperBound) {
- this.key = key;
+ public GetCommand(@NotNull ByteArray key, long revUpperBound) {
+ this.key = key.bytes();
assert revUpperBound > 0;
@@ -54,7 +54,7 @@ public final class GetCommand implements ReadCommand {
/**
* @return Key.
*/
- public @NotNull ByteArray key() {
+ public @NotNull byte[] key() {
return key;
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
new file mode 100644
index 0000000..6981a0e
--- /dev/null
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.common.command;
+
+import org.apache.ignite.raft.client.Command;
+
+import java.util.List;
+
+/**
+ * Represents invoke command for meta storage.
+ */
+public class InvokeCommand implements Command {
+ /** Condition. */
+ private final ConditionInfo cond;
+
+ /** Success operations. */
+ private final List<OperationInfo> success;
+
+ /** Failure operations. */
+ private final List<OperationInfo> failure;
+
+ /**
+ * Constructs invoke command instance.
+ *
+ * @param cond Condition.
+ * @param success Success operations.
+ * @param failure Failure operations.
+ */
+ public InvokeCommand(ConditionInfo cond, List<OperationInfo> success, List<OperationInfo> failure) {
+ this.cond = cond;
+ this.success = success;
+ this.failure = failure;
+ }
+
+ /**
+ * Returns condition.
+ *
+ * @return Condition.
+ */
+ public ConditionInfo condition() {
+ return cond;
+ }
+
+ /**
+ * Returns success operations.
+ *
+ * @return Success operations.
+ */
+ public List<OperationInfo> success() {
+ return success;
+ }
+
+ /**
+ * Returns failure operations.
+ *
+ * @return Failure operations.
+ */
+ public List<OperationInfo> failure() {
+ return failure;
+ }
+}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultipleEntryResponse.java
similarity index 59%
copy from modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
copy to modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultipleEntryResponse.java
index 18322b9..a11a3e2 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/MultipleEntryResponse.java
@@ -17,28 +17,31 @@
package org.apache.ignite.internal.metastorage.common.command;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
+import java.io.Serializable;
+import java.util.List;
/**
- * Remove command for MetaStorageCommandListener that removes an entry for the given key.
+ * Defines response for command which returns a number of results.
*/
-public final class RemoveCommand implements WriteCommand {
- /** The key. Couldn't be {@code null}. */
- @NotNull private final ByteArray key;
+public class MultipleEntryResponse implements Serializable {
+ /** Single responses. */
+ private final List<SingleEntryResponse> entries;
/**
- * @param key he key. Couldn't be {@code null}.
+ * Constructs multiple entries response.
+ *
+ * @param entries The ;list of single responses.
*/
- public RemoveCommand(@NotNull ByteArray key) {
- this.key = key;
+ public MultipleEntryResponse(List<SingleEntryResponse> entries) {
+ this.entries = entries;
}
/**
- * @return The key. Couldn't be {@code null}.
+ * Returns the list of single responses.
+ *
+ * @return The list of single responses.
*/
- public @NotNull ByteArray key() {
- return key;
+ public List<SingleEntryResponse> entries() {
+ return entries;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/OperationInfo.java
similarity index 53%
copy from modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
copy to modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/OperationInfo.java
index 1efff24..21776a2 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/OperationInfo.java
@@ -17,41 +17,60 @@
package org.apache.ignite.internal.metastorage.common.command;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.internal.metastorage.common.OperationType;
+
+import java.io.Serializable;
/**
- * Put command for MetaStorageCommandListener that inserts or updates an entry
- * with the given key and the given value and retrieves a previous entry for the given key.
+ * Defines operation.
*/
-public final class PutCommand implements WriteCommand {
- /** The key. Couldn't be {@code null}. */
- @NotNull private final ByteArray key;
+public class OperationInfo implements Serializable {
+ /** Key. */
+ private final byte[] key;
+
+ /** Value. */
+ private final byte[] val;
- /** The value. Couldn't be {@code null}. */
- @NotNull private final byte[] val;
+ /** Operation type. */
+ private final OperationType type;
/**
- * @param key The key. Couldn't be {@code null}.
- * @param val The value. Couldn't be {@code null}.
+ * Constructs operation with given parameters.
+ *
+ * @param key Key.
+ * @param val Value.
+ * @param type Operation type.
*/
- public PutCommand(@NotNull ByteArray key, @NotNull byte[] val) {
+ public OperationInfo(byte[] key, byte[] val, OperationType type) {
this.key = key;
this.val = val;
+ this.type = type;
+ }
+
+ /**
+ * Returns operation type.
+ *
+ * @return Operation type.
+ */
+ public OperationType type() {
+ return type;
}
/**
- * @return The key. Couldn't be {@code null}.
+ * Returns key.
+ *
+ * @return Key.
*/
- public @NotNull ByteArray key() {
+ public byte[] key() {
return key;
}
/**
- * @return The value. Couldn't be {@code null}.
+ * Returns value.
+ *
+ * @return Value.
*/
- public @NotNull byte[] value() {
+ public byte[] value() {
return val;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
index 029ba99..fe25f90 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.metastorage.common.command;
-import java.io.Serializable;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.WriteCommand;
@@ -29,8 +29,11 @@ import org.jetbrains.annotations.NotNull;
* with given keys and given values.
*/
public final class PutAllCommand implements WriteCommand {
- /** The map of keys and corresponding values. Couldn't be {@code null} or empty. */
- @NotNull private final Map<ByteArray, byte[]> vals;
+ /** List of keys. */
+ private final List<byte[]> keys;
+
+ /** List of values. */
+ private final List<byte[]> vals;
/**
* @param vals he map of keys and corresponding values. Couldn't be {@code null} or empty.
@@ -38,16 +41,37 @@ public final class PutAllCommand implements WriteCommand {
public PutAllCommand(@NotNull Map<ByteArray, byte[]> vals) {
assert !vals.isEmpty();
- if (vals instanceof Serializable)
- this.vals = vals;
- else
- this.vals = new HashMap<>(vals);
+ int size = vals.size();
+
+ this.keys = new ArrayList<>(size);
+
+ this.vals = new ArrayList<>(size);
+
+ for (Map.Entry<ByteArray, byte[]> e : vals.entrySet()) {
+ byte[] key = e.getKey().bytes();
+
+ byte[] val = e.getValue();
+
+ assert key != null : "Key could not be null.";
+ assert val != null : "Value could not be null.";
+
+ this.keys.add(key);
+
+ this.vals.add(val);
+ }
+ }
+
+ /**
+ * @return Entries values.
+ */
+ public @NotNull List<byte[]> keys() {
+ return keys;
}
/**
- * @return The map of keys and corresponding values. Couldn't be or empty.
+ * @return Entries values.
*/
- public @NotNull Map<ByteArray, byte[]> values() {
+ public @NotNull List<byte[]> values() {
return vals;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
index 1efff24..fe24ffe 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
@@ -27,7 +27,7 @@ import org.jetbrains.annotations.NotNull;
*/
public final class PutCommand implements WriteCommand {
/** The key. Couldn't be {@code null}. */
- @NotNull private final ByteArray key;
+ @NotNull private final byte[] key;
/** The value. Couldn't be {@code null}. */
@NotNull private final byte[] val;
@@ -37,14 +37,14 @@ public final class PutCommand implements WriteCommand {
* @param val The value. Couldn't be {@code null}.
*/
public PutCommand(@NotNull ByteArray key, @NotNull byte[] val) {
- this.key = key;
+ this.key = key.bytes();
this.val = val;
}
/**
* @return The key. Couldn't be {@code null}.
*/
- public @NotNull ByteArray key() {
+ public @NotNull byte[] key() {
return key;
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
index 8053f69..b531454 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
@@ -28,10 +28,10 @@ import org.jetbrains.annotations.Nullable;
*/
public final class RangeCommand implements WriteCommand {
/** Start key of range (inclusive). Couldn't be {@code null}. */
- @NotNull private final ByteArray keyFrom;
+ @NotNull private final byte[] keyFrom;
/** End key of range (exclusive). Could be {@code null}. */
- @Nullable private final ByteArray keyTo;
+ @Nullable private final byte[] keyTo;
/** The upper bound for entry revision. {@code -1} means latest revision. */
@NotNull private final long revUpperBound;
@@ -52,24 +52,24 @@ public final class RangeCommand implements WriteCommand {
public RangeCommand(
@NotNull ByteArray keyFrom,
@Nullable ByteArray keyTo,
- @NotNull long revUpperBound
+ long revUpperBound
) {
- this.keyFrom = keyFrom;
- this.keyTo = keyTo;
+ this.keyFrom = keyFrom.bytes();
+ this.keyTo = keyTo == null ? null : keyTo.bytes();
this.revUpperBound = revUpperBound;
}
/**
* @return Start key of range (inclusive). Couldn't be {@code null}.
*/
- public @NotNull ByteArray keyFrom() {
+ public @NotNull byte[] keyFrom() {
return keyFrom;
}
/**
* @return End key of range (exclusive). Could be {@code null}.
*/
- public @Nullable ByteArray keyTo() {
+ public @Nullable byte[] keyTo() {
return keyTo;
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
index 4edad35..b40854b 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.metastorage.common.command;
-import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.List;
+import java.util.Set;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -28,23 +28,23 @@ import org.jetbrains.annotations.NotNull;
* Remove all command for MetaStorageCommandListener that removes entries for given keys.
*/
public final class RemoveAllCommand implements WriteCommand {
- /** The keys collection. Couldn't be {@code null}. */
- @NotNull private final Collection<ByteArray> keys;
+ /** The keys list. Couldn't be {@code null}. */
+ @NotNull private final List<byte[]> keys;
/**
* @param keys The keys collection. Couldn't be {@code null}.
*/
- public RemoveAllCommand(@NotNull Collection<ByteArray> keys) {
- if (keys instanceof Serializable)
- this.keys = keys;
- else
- this.keys = new ArrayList<>(keys);
+ public RemoveAllCommand(@NotNull Set<ByteArray> keys) {
+ this.keys = new ArrayList<>(keys.size());
+
+ for (ByteArray key : keys)
+ this.keys.add(key.bytes());
}
/**
- * @return The keys collection. Couldn't be {@code null}.
+ * @return The keys list. Couldn't be {@code null}.
*/
- public @NotNull Collection<ByteArray> keys() {
+ public @NotNull List<byte[]> keys() {
return keys;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
index 18322b9..b98b829 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
@@ -26,19 +26,19 @@ import org.jetbrains.annotations.NotNull;
*/
public final class RemoveCommand implements WriteCommand {
/** The key. Couldn't be {@code null}. */
- @NotNull private final ByteArray key;
+ @NotNull private final byte[] key;
/**
* @param key he key. Couldn't be {@code null}.
*/
public RemoveCommand(@NotNull ByteArray key) {
- this.key = key;
+ this.key = key.bytes();
}
/**
* @return The key. Couldn't be {@code null}.
*/
- public @NotNull ByteArray key() {
+ public @NotNull byte[] key() {
return key;
}
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/SingleEntryResponse.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/SingleEntryResponse.java
new file mode 100644
index 0000000..678ec1a
--- /dev/null
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/SingleEntryResponse.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.common.command;
+
+import java.io.Serializable;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Defines response for command which returns exactly one result (entry).
+ */
+public class SingleEntryResponse implements Serializable {
+ /** Key. */
+ @NotNull
+ private final byte[] key;
+
+ /** Value. */
+ @Nullable
+ private final byte[] val;
+
+ /** Revision. */
+ private final long rev;
+
+ /** Update counter */
+ private final long updCntr;
+
+ /**
+ * Constructs single entry response.
+ *
+ * @param key Key. Couldn't be {@code null}.
+ * @param val Value. Could be {@code null} for empty and tombstone entries.
+ * @param rev Revision number.
+ * @param updCntr Update counter.
+ */
+ public SingleEntryResponse(byte[] key, byte[] val, long rev, long updCntr) {
+ this.key = key;
+ this.val = val;
+ this.rev = rev;
+ this.updCntr = updCntr;
+ }
+
+ /**
+ * Returns key.
+ *
+ * @return Entry key. Couldn't be {@code null}.
+ */
+ @NotNull public byte[] key() {
+ return key;
+ }
+
+ /**
+ * Returns value.
+ *
+ * @return Entry value. Could be {@code null} for empty and tombstone entries.
+ */
+ @Nullable public byte[] value() {
+ return val;
+ }
+
+ /**
+ * Returns revision.
+ *
+ * @return Entry revision.
+ */
+ public long revision() {
+ return rev;
+ }
+
+ /**
+ * Returns update counter.
+ *
+ * @return Entry update counter.
+ */
+ public long updateCounter() {
+ return updCntr;
+ }
+}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
index 9a0c628..efde780 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
@@ -17,9 +17,9 @@
package org.apache.ignite.internal.metastorage.common.command;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.List;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -28,29 +28,29 @@ import org.jetbrains.annotations.NotNull;
* Watch command for MetaStorageCommandListener that subscribes on meta storage updates matching the parameters.
*/
public final class WatchExactKeysCommand implements WriteCommand {
- /** The keys collection. Couldn't be {@code null}. */
- @NotNull private final Collection<ByteArray> keys;
+ /** The keys list. Couldn't be {@code null}. */
+ @NotNull private final List<byte[]> keys;
/** Start revision inclusive. {@code 0} - all revisions. */
- @NotNull private final Long revision;
+ private final long revision;
/**
* @param keys The keys collection. Couldn't be {@code null}.
* @param revision Start revision inclusive. {@code 0} - all revisions.
*/
- public WatchExactKeysCommand(@NotNull Collection<ByteArray> keys, @NotNull Long revision) {
- if (keys instanceof Serializable)
- this.keys = keys;
- else
- this.keys = new ArrayList<>(keys);
+ public WatchExactKeysCommand(@NotNull Collection<ByteArray> keys, long revision) {
+ this.keys = new ArrayList<>(keys.size());
+
+ for (ByteArray key : keys)
+ this.keys.add(key.bytes());
this.revision = revision;
}
/**
- * @return The keys collection. Couldn't be {@code null}.
+ * @return The keys list. Couldn't be {@code null}.
*/
- public @NotNull Collection<ByteArray> keys() {
+ public @NotNull List<byte[]> keys() {
return keys;
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
index f164642..80e4b46 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.metastorage.common.command;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
/**
@@ -27,10 +26,10 @@ import org.jetbrains.annotations.Nullable;
*/
public final class WatchRangeKeysCommand implements WriteCommand {
/** Start key of range (inclusive). Couldn't be {@code null}. */
- @Nullable private final ByteArray keyFrom;
+ @Nullable private final byte[] keyFrom;
/** End key of range (exclusive). Could be {@code null}. */
- @Nullable private final ByteArray keyTo;
+ @Nullable private final byte[] keyTo;
/** Start revision inclusive. {@code 0} - all revisions. */
private final long revision;
@@ -51,24 +50,24 @@ public final class WatchRangeKeysCommand implements WriteCommand {
public WatchRangeKeysCommand(
@Nullable ByteArray keyFrom,
@Nullable ByteArray keyTo,
- @NotNull long revision
+ long revision
) {
- this.keyFrom = keyFrom;
- this.keyTo = keyTo;
+ this.keyFrom = keyFrom == null ? null : keyFrom.bytes();
+ this.keyTo = keyTo == null ? null : keyTo.bytes();
this.revision = revision;
}
/**
* @return Start key of range (inclusive). Couldn't be {@code null}.
*/
- public @Nullable ByteArray keyFrom() {
+ public @Nullable byte[] keyFrom() {
return keyFrom;
}
/**
* @return End key of range (exclusive). Could be {@code null}.
*/
- public @Nullable ByteArray keyTo() {
+ public @Nullable byte[] keyTo() {
return keyTo;
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
similarity index 50%
copy from modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
copy to modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
index c1e4f56..85cbf3d 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/CompactedException.java
@@ -18,41 +18,42 @@
package org.apache.ignite.internal.metastorage.server;
/**
- * Represent an update event for particular key and entry.
+ * Thrown when a requested operation on meta storage could not be performed because target revisions were removed
+ * from storage due to a compaction procedure. In such case the operation should be retried with actual revision.
*/
-public class EntryEvent {
- /** Old (previous) entry. */
- private final Entry oldEntry;
-
- /** New (current) entry. */
- private final Entry entry;
+public class CompactedException extends RuntimeException {
+ /**
+ * Constructs an exception.
+ */
+ public CompactedException() {
+ super();
+ }
/**
- * Constructs event with given old and new entries.
+ * Constructs an exception with a given message.
*
- * @param oldEntry Old entry.
- * @param curEntry New entry.
+ * @param message Detail message.
*/
- EntryEvent(Entry oldEntry, Entry curEntry) {
- this.oldEntry = oldEntry;
- this.entry = curEntry;
+ public CompactedException(String message) {
+ super(message);
}
/**
- * Returns old entry.
+ * Constructs an exception with a given message and a cause.
*
- * @return Old entry.
+ * @param message Detail message.
+ * @param cause Cause.
*/
- public Entry oldEntry() {
- return oldEntry;
+ public CompactedException(String message, Throwable cause) {
+ super(message, cause);
}
/**
- * Rreturns new entry.
+ * Constructs an exception with a given cause.
*
- * @return New entry.
+ * @param cause Cause.
*/
- public Entry entry() {
- return entry;
+ public CompactedException(Throwable cause) {
+ super(cause);
}
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
index 87b5471..22f1aec 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -36,7 +36,6 @@ import org.jetbrains.annotations.Nullable;
* <li>A tombstone entry which denotes that a regular entry for a given key was removed from storage on some revision.</li>
* </ul>
*/
-//TODO: Separate client and server entries. Empty and tombstone for client is the same.
public class Entry {
/** Entry key. Couldn't be {@code null}. */
@NotNull
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
index c1e4f56..ef0adba 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/EntryEvent.java
@@ -33,7 +33,7 @@ public class EntryEvent {
* @param oldEntry Old entry.
* @param curEntry New entry.
*/
- EntryEvent(Entry oldEntry, Entry curEntry) {
+ public EntryEvent(Entry oldEntry, Entry curEntry) {
this.oldEntry = oldEntry;
this.entry = curEntry;
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
index f46610c..d4c7da5 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java
@@ -22,54 +22,185 @@ import java.util.List;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.NotNull;
+/**
+ * Defines key/value storage interface.
+ */
public interface KeyValueStorage {
+ /**
+ * Returns storage revision.
+ *
+ * @return Storage revision.
+ */
long revision();
+ /**
+ * Returns update counter.
+ *
+ * @return Update counter.
+ */
long updateCounter();
- @NotNull
- Entry get(byte[] key);
-
- @NotNull
- Entry get(byte[] key, long rev);
-
- @NotNull
- Collection<Entry> getAll(List<byte[]> keys);
-
- @NotNull
- Collection<Entry> getAll(List<byte[]> keys, long revUpperBound);
-
- void put(byte[] key, byte[] value);
-
- @NotNull
- Entry getAndPut(byte[] key, byte[] value);
-
+ /**
+ * Returns an entry by the given key.
+ *
+ * @param key The key.
+ * @return Value corresponding to the given key.
+ */
+ @NotNull Entry get(byte[] key);
+
+ /**
+ * Returns an entry by the given key and revision.
+ *
+ * @param key The key.
+ * @param rev The revision.
+ * @return Value corresponding to the given key.
+ */
+ @NotNull Entry get(byte[] key, long rev);
+
+ /**
+ * Returns all entries corresponding to given keys.
+ *
+ * @param keys Keys collection.
+ * @return Entries corresponding to given keys.
+ */
+ @NotNull Collection<Entry> getAll(List<byte[]> keys);
+
+ /**
+ * Returns all entries corresponding to given keys and bounded by the given revision.
+ *
+ * @param keys Keys collection.
+ * @param revUpperBound Upper bound of revision.
+ * @return Entries corresponding to given keys.
+ */
+ @NotNull Collection<Entry> getAll(List<byte[]> keys, long revUpperBound);
+
+ /**
+ * Inserts an entry with the given key and given value.
+ *
+ * @param key The key.
+ * @param value The value.
+ */
+ void put(@NotNull byte[] key, @NotNull byte[] value);
+
+ /**
+ * Inserts an entry with the given key and given value and returns previous entry.
+ *
+ * @param key The key.
+ * @param value The value.
+ * @return Previous entry corresponding to the given key.
+ */
+ @NotNull Entry getAndPut(byte[] key, byte[] value);
+
+ /**
+ * Inserts entries with given keys and given values.
+ *
+ * @param keys The key list.
+ * @param values The values list.
+ */
void putAll(List<byte[]> keys, List<byte[]> values);
- @NotNull
- Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values);
-
+ /**
+ * Inserts entries with given keys and given values and returns previous entries.
+ *
+ * @param keys The key list.
+ * @param values The values list.
+ * @return Collection of previous entries corresponding to given keys.
+ */
+ @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values);
+
+ /**
+ * Removes an entry with the given key.
+ *
+ * @param key The key.
+ */
void remove(byte[] key);
- @NotNull
- Entry getAndRemove(byte[] key);
-
- void removeAll(List<byte[]> key);
-
- @NotNull
- Collection<Entry> getAndRemoveAll(List<byte[]> keys);
-
+ /**
+ * Removes an entry with the given key and returns previous entry.
+ *
+ * @param key The key.
+ * @return Previous entry.
+ */
+ @NotNull Entry getAndRemove(byte[] key);
+
+ /**
+ * Remove all entries corresponding to given keys.
+ *
+ * @param keys The keys list.
+ */
+ void removeAll(List<byte[]> keys);
+
+ /**
+ * Remove all entries corresponding to given keys and returns previous entries.
+ *
+ * @param keys The keys list.
+ * @return Previous entries.
+ */
+ @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys);
+
+ /**
+ * Performs {@code success} operation if condition is {@code true}, otherwise performs
+ * {@code failure} operations.
+ *
+ * @param condition Condition.
+ * @param success Success operations.
+ * @param failure Failure operations.
+ * @return Result of test condition.
+ */
boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure);
+ /**
+ * Returns cursor by entries which correspond to the given keys range.
+ *
+ * @param keyFrom Start key of range (inclusive).
+ * @param keyTo Last key of range (exclusive).
+ * @return Cursor by entries which correspond to the given keys range.
+ */
Cursor<Entry> range(byte[] keyFrom, byte[] keyTo);
+ /**
+ * Returns cursor by entries which correspond to the given keys range and bounded by revision number..
+ *
+ * @param keyFrom Start key of range (inclusive).
+ * @param keyTo Last key of range (exclusive).
+ * @param revUpperBound Upper bound of revision.
+ * @return Cursor by entries which correspond to the given keys range.
+ */
Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound);
+ /**
+ * Creates subscription on updates of entries corresponding to the given keys range and starting from
+ * the given revision number.
+ *
+ * @param keyFrom Start key of range (inclusive).
+ * @param keyTo Last key of range (exclusive).
+ * @param rev Start revision number.
+ * @return Cursor by update events.
+ */
Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev);
+ /**
+ * Creates subscription on updates of entries corresponding to the given keys range (where upper bound is unlimited)
+ * and starting from the given revision number.
+ *
+ * @param key Start key of range (inclusive).
+ * @param rev Start revision number.
+ * @return Cursor by update events.
+ */
Cursor<WatchEvent> watch(byte[] key, long rev);
+ /**
+ * Creates subscription on updates of entries corresponding to the given keys collection
+ * and starting from the given revision number.
+ *
+ * @param keys Collection of keys
+ * @param rev Start revision number.
+ * @return Cursor by update events.
+ */
Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev);
+ /**
+ * Compacts storage (removes tombstones).
+ */
void compact();
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java
index aaea75b..4c69d7d 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.metastorage.server;
import java.util.Objects;
+import org.apache.ignite.internal.metastorage.common.OperationType;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -26,19 +27,19 @@ import org.jetbrains.annotations.Nullable;
* <p>
* Invariants:
* <ul>
- * <li>Any operation identifies a target entry by not null {@code key} except of {@link Type#NO_OP}.</li>
- * <li>Only {@link Type#PUT} operation contains value which will be written to meta storage.</li>
+ * <li>Any operation identifies a target entry by not null {@code key} except of {@link OperationType#NO_OP}.</li>
+ * <li>Only {@link OperationType#PUT} operation contains value which will be written to meta storage.</li>
* </ul>
*/
-final class Operation {
+public final class Operation {
/**
- * Key identifies an entry which operation will be applied to. Key is {@code null} for {@link Type#NO_OP} operation.
+ * Key identifies an entry which operation will be applied to. Key is {@code null} for {@link OperationType#NO_OP} operation.
*/
@Nullable
private final byte[] key;
/**
- * Value which will be associated with the {@link #key}. Value is not {@code null} only for {@link Type#PUT}
+ * Value which will be associated with the {@link #key}. Value is not {@code null} only for {@link OperationType#PUT}
* operation.
*/
@Nullable
@@ -46,10 +47,10 @@ final class Operation {
/**
* Operation type.
- * @see Type
+ * @see OperationType
*/
@NotNull
- private final Type type;
+ private final OperationType type;
/**
* Constructs operation which will be applied to an entry identified by the given key.
@@ -58,10 +59,10 @@ final class Operation {
* @param key Key identifies an entry which operation will be applied to.
* @param val Value will be associated with an entry identified by the {@code key}.
*/
- Operation(@NotNull Type type, @Nullable byte[] key, @Nullable byte[] val) {
- assert (type == Type.NO_OP && key == null && val == null)
- || (type == Type.PUT && key != null && val != null)
- || (type == Type.REMOVE && key != null && val == null)
+ public Operation(@NotNull OperationType type, @Nullable byte[] key, @Nullable byte[] val) {
+ assert (type == OperationType.NO_OP && key == null && val == null)
+ || (type == OperationType.PUT && key != null && val != null)
+ || (type == OperationType.REMOVE && key != null && val == null)
: "Invalid operation parameters: [type=" + type + ", key=" + Objects.toString(key,"null") +
", val=" + Objects.toString(key,"null") + ']';
@@ -75,7 +76,7 @@ final class Operation {
*
* @return A key which identifies an entry which operation will be applied to.
*/
- @Nullable byte[] key() {
+ @Nullable public byte[] key() {
return key;
}
@@ -84,7 +85,7 @@ final class Operation {
*
* @return A value which will be associated with an entry identified by the {@code key}.
*/
- @Nullable byte[] value() {
+ @Nullable public byte[] value() {
return val;
}
@@ -93,19 +94,7 @@ final class Operation {
*
* @return An operation type.
*/
- @NotNull Type type() {
+ @NotNull public OperationType type() {
return type;
}
-
- /** Defines operation types. */
- enum Type {
- /** Put operation. */
- PUT,
-
- /** Remove operation. */
- REMOVE,
-
- /** No-op operation. */
- NO_OP
- }
}
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 1f3f0e4..d37f0bb 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -472,8 +472,6 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
// Update keysIdx.
List<Long> revs = keysIdx.computeIfAbsent(key, k -> new ArrayList<>());
- long lastRev = revs.isEmpty() ? 0 : lastRevision(revs);
-
revs.add(curRev);
Value val = new Value(bytes, curUpdCntr);
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java
index 444bfdb..f33ff9b 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java
@@ -56,7 +56,7 @@ public class ValueCondition extends AbstractCondition {
/**
* Defines possible condition types which can be applied to the value.
*/
- enum Type {
+ public enum Type {
/** Equality condition type. */
EQUAL {
@Override public boolean test(long res) {
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageCommandListener.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageCommandListener.java
index 71b4f34..df6966f 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageCommandListener.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageCommandListener.java
@@ -17,35 +17,45 @@
package org.apache.ignite.internal.metastorage.server.raft;
-import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
+import org.apache.ignite.internal.metastorage.common.ConditionType;
+import org.apache.ignite.internal.metastorage.common.command.ConditionInfo;
import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
import org.apache.ignite.internal.metastorage.common.command.GetCommand;
+import org.apache.ignite.internal.metastorage.common.command.InvokeCommand;
+import org.apache.ignite.internal.metastorage.common.command.MultipleEntryResponse;
+import org.apache.ignite.internal.metastorage.common.command.OperationInfo;
import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
import org.apache.ignite.internal.metastorage.common.command.PutCommand;
import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
+import org.apache.ignite.internal.metastorage.common.command.SingleEntryResponse;
import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
+import org.apache.ignite.internal.metastorage.server.Condition;
import org.apache.ignite.internal.metastorage.server.Entry;
+import org.apache.ignite.internal.metastorage.server.EntryEvent;
+import org.apache.ignite.internal.metastorage.server.ExistenceCondition;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.Operation;
+import org.apache.ignite.internal.metastorage.server.RevisionCondition;
+import org.apache.ignite.internal.metastorage.server.ValueCondition;
import org.apache.ignite.internal.metastorage.server.WatchEvent;
import org.apache.ignite.internal.util.Cursor;
-import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
@@ -53,7 +63,6 @@ import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
-import org.jetbrains.annotations.NotNull;
/**
* Meta storage command listener.
@@ -66,7 +75,7 @@ public class MetaStorageCommandListener implements RaftGroupCommandListener {
private final KeyValueStorage storage;
/** Cursors map. */
- private final Map<IgniteUuid, IgniteBiTuple<@NotNull Cursor<?>, @NotNull Iterator<?>>> cursors;
+ private final Map<IgniteUuid, IgniteBiTuple<Cursor<?>, CursorType>> cursors;
/**
* @param storage Storage.
@@ -85,32 +94,42 @@ public class MetaStorageCommandListener implements RaftGroupCommandListener {
if (clo.command() instanceof GetCommand) {
GetCommand getCmd = (GetCommand)clo.command();
+ Entry e;
+
if (getCmd.revision() != 0)
- clo.success(storage.get(getCmd.key().bytes(), getCmd.revision()));
+ e = storage.get(getCmd.key(), getCmd.revision());
else
- clo.success(storage.get(getCmd.key().bytes()));
+ e = storage.get(getCmd.key());
+
+ SingleEntryResponse resp = new SingleEntryResponse(
+ e.key(), e.value(), e.revision(), e.updateCounter()
+ );
+
+ clo.success(resp);
}
else if (clo.command() instanceof GetAllCommand) {
GetAllCommand getAllCmd = (GetAllCommand)clo.command();
- if (getAllCmd.revision() != null) {
- clo.success(storage.getAll(
- getAllCmd.keys().stream().map(ByteArray::bytes).collect(Collectors.toList()),
- getAllCmd.revision())
- );
- }
- else {
- clo.success(storage.getAll(
- getAllCmd.keys().stream().map(ByteArray::bytes).collect(Collectors.toList()))
- );
- }
+ Collection<Entry> entries;
+
+ if (getAllCmd.revision() != 0)
+ entries = storage.getAll(getAllCmd.keys(), getAllCmd.revision());
+ else
+ entries = storage.getAll(getAllCmd.keys());
+
+ List<SingleEntryResponse> res = new ArrayList<>(entries.size());
+
+ for (Entry e : entries)
+ res.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+
+ clo.success(new MultipleEntryResponse(res));
}
else if (clo.command() instanceof CursorHasNextCommand) {
CursorHasNextCommand cursorHasNextCmd = (CursorHasNextCommand)clo.command();
assert cursors.containsKey(cursorHasNextCmd.cursorId());
- clo.success(cursors.get(cursorHasNextCmd.cursorId()).getValue().hasNext());
+ clo.success(cursors.get(cursorHasNextCmd.cursorId()).getKey().hasNext());
}
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
@@ -138,67 +157,79 @@ public class MetaStorageCommandListener implements RaftGroupCommandListener {
if (clo.command() instanceof PutCommand) {
PutCommand putCmd = (PutCommand)clo.command();
- storage.put(putCmd.key().bytes(), putCmd.value());
+ storage.put(putCmd.key(), putCmd.value());
clo.success(null);
}
else if (clo.command() instanceof GetAndPutCommand) {
GetAndPutCommand getAndPutCmd = (GetAndPutCommand)clo.command();
- clo.success(storage.getAndPut(getAndPutCmd.key().bytes(), getAndPutCmd.value()));
+ Entry e = storage.getAndPut(getAndPutCmd.key(), getAndPutCmd.value());
+
+ clo.success(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
}
else if (clo.command() instanceof PutAllCommand) {
PutAllCommand putAllCmd = (PutAllCommand)clo.command();
- storage.putAll(
- putAllCmd.values().keySet().stream().map(ByteArray::bytes).collect(Collectors.toList()),
- new ArrayList<>(putAllCmd.values().values()));
+ storage.putAll(putAllCmd.keys(), putAllCmd.values());
clo.success(null);
}
else if (clo.command() instanceof GetAndPutAllCommand) {
GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand)clo.command();
- Collection<Entry> entries = storage.getAndPutAll(
- getAndPutAllCmd.keys().stream().map(ByteArray::bytes).collect(Collectors.toList()),
- getAndPutAllCmd.vals()
- );
+ Collection<Entry> entries = storage.getAndPutAll(getAndPutAllCmd.keys(), getAndPutAllCmd.vals());
+
+ List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
- if (!(entries instanceof Serializable))
- entries = new ArrayList<>(entries);
+ for (Entry e : entries)
+ resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
- clo.success(entries);
+ clo.success(new MultipleEntryResponse(resp));
}
else if (clo.command() instanceof RemoveCommand) {
RemoveCommand rmvCmd = (RemoveCommand)clo.command();
- storage.remove(rmvCmd.key().bytes());
+ storage.remove(rmvCmd.key());
clo.success(null);
}
else if (clo.command() instanceof GetAndRemoveCommand) {
GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand)clo.command();
- clo.success(storage.getAndRemove(getAndRmvCmd.key().bytes()));
+ Entry e = storage.getAndRemove(getAndRmvCmd.key());
+
+ clo.success(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
}
else if (clo.command() instanceof RemoveAllCommand) {
RemoveAllCommand rmvAllCmd = (RemoveAllCommand)clo.command();
- storage.removeAll(rmvAllCmd.keys().stream().map(ByteArray::bytes).collect(Collectors.toList()));
+ storage.removeAll(rmvAllCmd.keys());
clo.success(null);
}
else if (clo.command() instanceof GetAndRemoveAllCommand) {
GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand)clo.command();
- Collection<Entry> entries = storage.getAndRemoveAll(
- getAndRmvAllCmd.keys().stream().map(ByteArray::bytes).collect(Collectors.toList())
- );
+ Collection<Entry> entries = storage.getAndRemoveAll(getAndRmvAllCmd.keys());
+
+ List<SingleEntryResponse> resp = new ArrayList<>(entries.size());
- if (!(entries instanceof Serializable))
- entries = new ArrayList<>(entries);
+ for (Entry e : entries)
+ resp.add(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
- clo.success(entries);
+ clo.success(new MultipleEntryResponse(resp));
+ }
+ else if (clo.command() instanceof InvokeCommand) {
+ InvokeCommand cmd = (InvokeCommand)clo.command();
+
+ boolean res = storage.invoke(
+ toCondition(cmd.condition()),
+ toOperations(cmd.success()),
+ toOperations(cmd.failure())
+ );
+
+ clo.success(res);
}
else if (clo.command() instanceof RangeCommand) {
RangeCommand rangeCmd = (RangeCommand)clo.command();
@@ -206,15 +237,12 @@ public class MetaStorageCommandListener implements RaftGroupCommandListener {
IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
Cursor<Entry> cursor = storage.range(
- rangeCmd.keyFrom().bytes(),
- rangeCmd.keyTo() == null ? null : rangeCmd.keyTo().bytes(),
+ rangeCmd.keyFrom(),
+ rangeCmd.keyTo(),
rangeCmd.revUpperBound()
);
- cursors.put(
- cursorId,
- new IgniteBiTuple<>(cursor, cursor.iterator())
- );
+ cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.RANGE));
clo.success(cursorId);
}
@@ -223,7 +251,31 @@ public class MetaStorageCommandListener implements RaftGroupCommandListener {
assert cursors.containsKey(cursorNextCmd.cursorId());
- clo.success(cursors.get(cursorNextCmd.cursorId()).getValue().next());
+ IgniteBiTuple<Cursor<?>, CursorType> cursorDesc = cursors.get(cursorNextCmd.cursorId());
+
+ if (cursorDesc.getValue() == CursorType.RANGE) {
+ Entry e = (Entry)cursorDesc.getKey().next();
+
+ clo.success(new SingleEntryResponse(e.key(), e.value(), e.revision(), e.updateCounter()));
+ }
+ else if (cursorDesc.getValue() == CursorType.WATCH) {
+ WatchEvent evt = (WatchEvent)cursorDesc.getKey().next();
+
+ List<SingleEntryResponse> resp = new ArrayList<>(evt.entryEvents().size() * 2);
+
+ for (EntryEvent e : evt.entryEvents()) {
+ Entry o = e.oldEntry();
+
+ Entry n = e.entry();
+
+ resp.add(new SingleEntryResponse(o.key(), o.value(), o.revision(), o.updateCounter()));
+
+ resp.add(new SingleEntryResponse(n.key(), n.value(), n.revision(), n.updateCounter()));
+ }
+
+ clo.success(new MultipleEntryResponse(resp));
+ }
+
}
else if (clo.command() instanceof CursorCloseCommand) {
CursorCloseCommand cursorCloseCmd = (CursorCloseCommand)clo.command();
@@ -248,15 +300,10 @@ public class MetaStorageCommandListener implements RaftGroupCommandListener {
IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
- Cursor<WatchEvent> cursor = storage.watch(
- watchCmd.keyFrom() == null ? null : watchCmd.keyFrom().bytes(),
- watchCmd.keyTo() == null ? null : watchCmd.keyTo().bytes(),
- watchCmd.revision());
+ Cursor<WatchEvent> cursor =
+ storage.watch(watchCmd.keyFrom(), watchCmd.keyTo(), watchCmd.revision());
- cursors.put(
- cursorId,
- new IgniteBiTuple<>(cursor, cursor.iterator())
- );
+ cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.WATCH));
clo.success(cursorId);
}
@@ -265,14 +312,9 @@ public class MetaStorageCommandListener implements RaftGroupCommandListener {
IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
- Cursor<WatchEvent> cursor = storage.watch(
- watchCmd.keys().stream().map(ByteArray::bytes).collect(Collectors.toList()),
- watchCmd.revision());
+ Cursor<WatchEvent> cursor = storage.watch(watchCmd.keys(), watchCmd.revision());
- cursors.put(
- cursorId,
- new IgniteBiTuple<>(cursor, cursor.iterator())
- );
+ cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.WATCH));
clo.success(cursorId);
}
@@ -292,4 +334,49 @@ public class MetaStorageCommandListener implements RaftGroupCommandListener {
}
}
}
+
+ /** */
+ private static Condition toCondition(ConditionInfo info) {
+ byte[] key = info.key();
+
+ ConditionType type = info.type();
+
+ if (type == ConditionType.KEY_EXISTS)
+ return new ExistenceCondition(ExistenceCondition.Type.EXISTS, key);
+ else if (type == ConditionType.KEY_NOT_EXISTS)
+ return new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key);
+ else if (type == ConditionType.VAL_EQUAL)
+ return new ValueCondition(ValueCondition.Type.EQUAL, key, info.value());
+ else if (type == ConditionType.VAL_NOT_EQUAL)
+ return new ValueCondition(ValueCondition.Type.NOT_EQUAL, key, info.value());
+ else if (type == ConditionType.REV_EQUAL)
+ return new RevisionCondition(RevisionCondition.Type.EQUAL, key, info.revision());
+ else if (type == ConditionType.REV_NOT_EQUAL)
+ return new RevisionCondition(RevisionCondition.Type.NOT_EQUAL, key, info.revision());
+ else if (type == ConditionType.REV_GREATER)
+ return new RevisionCondition(RevisionCondition.Type.GREATER, key, info.revision());
+ else if (type == ConditionType.REV_GREATER_OR_EQUAL)
+ return new RevisionCondition(RevisionCondition.Type.GREATER_OR_EQUAL, key, info.revision());
+ else if (type == ConditionType.REV_LESS)
+ return new RevisionCondition(RevisionCondition.Type.LESS, key, info.revision());
+ else if (type == ConditionType.REV_LESS_OR_EQUAL)
+ return new RevisionCondition(RevisionCondition.Type.LESS_OR_EQUAL, key, info.revision());
+ else
+ throw new IllegalArgumentException();
+ }
+
+ /** */
+ private static List<Operation> toOperations(List<OperationInfo> infos) {
+ List<Operation> ops = new ArrayList<>(infos.size());
+
+ for (OperationInfo info : infos)
+ ops.add(new Operation(info.type(), info.key(), info.value()));
+
+ return ops;
+ }
+
+ /** Cursor type. */
+ private enum CursorType {
+ RANGE, WATCH;
+ }
}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 8e138fd..9fe8ba7 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -23,11 +23,11 @@ import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.metastorage.common.OperationType;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.ByteArray;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-
import static java.util.function.Function.identity;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -37,6 +37,9 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+/**
+ * Tests for in-memory meta storage implementation.
+ */
class SimpleInMemoryKeyValueStorageTest {
private KeyValueStorage storage;
@@ -967,10 +970,10 @@ class SimpleInMemoryKeyValueStorageTest {
boolean branch = storage.invoke(
new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1),
List.of(
- new Operation(Operation.Type.PUT, key1, val1_2),
- new Operation(Operation.Type.PUT, key2, val2)
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
),
- List.of(new Operation(Operation.Type.PUT, key3, val3))
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1022,10 +1025,10 @@ class SimpleInMemoryKeyValueStorageTest {
boolean branch = storage.invoke(
new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2),
- List.of(new Operation(Operation.Type.PUT, key3, val3)),
+ List.of(new Operation(OperationType.PUT, key3, val3)),
List.of(
- new Operation(Operation.Type.PUT, key1, val1_2),
- new Operation(Operation.Type.PUT, key2, val2)
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
)
);
@@ -1079,10 +1082,10 @@ class SimpleInMemoryKeyValueStorageTest {
boolean branch = storage.invoke(
new ExistenceCondition(ExistenceCondition.Type.EXISTS, key1),
List.of(
- new Operation(Operation.Type.PUT, key1, val1_2),
- new Operation(Operation.Type.PUT, key2, val2)
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
),
- List.of(new Operation(Operation.Type.PUT, key3, val3))
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1134,10 +1137,10 @@ class SimpleInMemoryKeyValueStorageTest {
boolean branch = storage.invoke(
new ExistenceCondition(ExistenceCondition.Type.EXISTS, key3),
- List.of(new Operation(Operation.Type.PUT, key3, val3)),
+ List.of(new Operation(OperationType.PUT, key3, val3)),
List.of(
- new Operation(Operation.Type.PUT, key1, val1_2),
- new Operation(Operation.Type.PUT, key2, val2)
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
)
);
@@ -1191,10 +1194,10 @@ class SimpleInMemoryKeyValueStorageTest {
boolean branch = storage.invoke(
new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key2),
List.of(
- new Operation(Operation.Type.PUT, key1, val1_2),
- new Operation(Operation.Type.PUT, key2, val2)
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
),
- List.of(new Operation(Operation.Type.PUT, key3, val3))
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1246,10 +1249,10 @@ class SimpleInMemoryKeyValueStorageTest {
boolean branch = storage.invoke(
new ExistenceCondition(ExistenceCondition.Type.NOT_EXISTS, key1),
- List.of(new Operation(Operation.Type.PUT, key3, val3)),
+ List.of(new Operation(OperationType.PUT, key3, val3)),
List.of(
- new Operation(Operation.Type.PUT, key1, val1_2),
- new Operation(Operation.Type.PUT, key2, val2)
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
)
);
@@ -1303,10 +1306,10 @@ class SimpleInMemoryKeyValueStorageTest {
boolean branch = storage.invoke(
new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1),
List.of(
- new Operation(Operation.Type.PUT, key1, val1_2),
- new Operation(Operation.Type.PUT, key2, val2)
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
),
- List.of(new Operation(Operation.Type.PUT, key3, val3))
+ List.of(new Operation(OperationType.PUT, key3, val3))
);
// "Success" branch is applied.
@@ -1358,10 +1361,10 @@ class SimpleInMemoryKeyValueStorageTest {
boolean branch = storage.invoke(
new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2),
- List.of(new Operation(Operation.Type.PUT, key3, val3)),
+ List.of(new Operation(OperationType.PUT, key3, val3)),
List.of(
- new Operation(Operation.Type.PUT, key1, val1_2),
- new Operation(Operation.Type.PUT, key2, val2)
+ new Operation(OperationType.PUT, key1, val1_2),
+ new Operation(OperationType.PUT, key2, val2)
)
);
@@ -1414,8 +1417,8 @@ class SimpleInMemoryKeyValueStorageTest {
// No-op.
boolean branch = storage.invoke(
new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
- List.of(new Operation(Operation.Type.NO_OP, null, null)),
- List.of(new Operation(Operation.Type.NO_OP, null, null))
+ List.of(new Operation(OperationType.NO_OP, null, null)),
+ List.of(new Operation(OperationType.NO_OP, null, null))
);
assertTrue(branch);
@@ -1428,10 +1431,10 @@ class SimpleInMemoryKeyValueStorageTest {
branch = storage.invoke(
new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
List.of(
- new Operation(Operation.Type.PUT, key2, val2),
- new Operation(Operation.Type.PUT, key3, val3)
+ new Operation(OperationType.PUT, key2, val2),
+ new Operation(OperationType.PUT, key3, val3)
),
- List.of(new Operation(Operation.Type.NO_OP, null, null))
+ List.of(new Operation(OperationType.NO_OP, null, null))
);
assertTrue(branch);
@@ -1462,10 +1465,10 @@ class SimpleInMemoryKeyValueStorageTest {
branch = storage.invoke(
new ValueCondition(ValueCondition.Type.EQUAL, key1, val1),
List.of(
- new Operation(Operation.Type.REMOVE, key2, null),
- new Operation(Operation.Type.REMOVE, key3, null)
+ new Operation(OperationType.REMOVE, key2, null),
+ new Operation(OperationType.REMOVE, key3, null)
),
- List.of(new Operation(Operation.Type.NO_OP, null, null))
+ List.of(new Operation(OperationType.NO_OP, null, null))
);
assertTrue(branch);
diff --git a/modules/metastorage/pom.xml b/modules/metastorage/pom.xml
index 6c3a3f8..7e98589 100644
--- a/modules/metastorage/pom.xml
+++ b/modules/metastorage/pom.xml
@@ -55,12 +55,12 @@
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-api</artifactId>
+ <artifactId>ignite-metastorage-server</artifactId>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-vault</artifactId>
+ <artifactId>ignite-api</artifactId>
</dependency>
<dependency>
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index f631db8..2249268 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
@@ -29,6 +30,8 @@ import java.util.stream.Collectors;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl;
+import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
+import org.apache.ignite.internal.metastorage.server.raft.MetaStorageCommandListener;
import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.util.Cursor;
@@ -128,19 +131,17 @@ public class MetaStorageManager {
if (hasMetastorage(locNodeName, metastorageNodes)) {
- //TODO:
-/*
this.metaStorageSvcFut = CompletableFuture.completedFuture(new MetaStorageServiceImpl(
raftMgr.startRaftGroup(
METASTORAGE_RAFT_GROUP_NAME,
clusterNetSvc.topologyService().allMembers().stream().filter(
metaStorageNodesContainsLocPred).
collect(Collectors.toList()),
- new MetaStorageCommandListener(new KeyValueStorageImpl())
+ new MetaStorageCommandListener(new SimpleInMemoryKeyValueStorage())
)
)
);
-*/
+
}
else if (metastorageNodes.length > 0) {
this.metaStorageSvcFut = CompletableFuture.completedFuture(new MetaStorageServiceImpl(
@@ -339,16 +340,16 @@ public class MetaStorageManager {
}
/**
- * @see MetaStorageService#removeAll(Collection)
+ * @see MetaStorageService#removeAll(Set)
*/
- public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<ByteArray> keys) {
+ public @NotNull CompletableFuture<Void> removeAll(@NotNull Set<ByteArray> keys) {
return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
}
/**
- * @see MetaStorageService#getAndRemoveAll(Collection)
+ * @see MetaStorageService#getAndRemoveAll(Set)
*/
- public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Collection<ByteArray> keys) {
+ public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAndRemoveAll(@NotNull Set<ByteArray> keys) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys));
}
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
index b50d603..5482f87 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
@@ -28,6 +28,8 @@ import org.apache.ignite.metastorage.client.WatchListener;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
@@ -49,19 +51,20 @@ public class WatchAggregatorTest {
entry("1", "value1n", 1, 1)
);
- var watchEvent1 = new WatchEvent(entryEvt1);
-
var entryEvt2 = new EntryEvent(
entry("2", "value2", 1, 1),
entry("2", "value2n", 1, 1)
);
- var watchEvent2 = new WatchEvent(entryEvt2);
-
watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(new WatchEvent(List.of(entryEvt1, entryEvt2)));
- verify(lsnr1).onUpdate(watchEvent1);
- verify(lsnr2).onUpdate(watchEvent2);
+ var watchEvt1Res = ArgumentCaptor.forClass(WatchEvent.class);
+ verify(lsnr1).onUpdate(watchEvt1Res.capture());
+ assertEquals(List.of(entryEvt1), watchEvt1Res.getValue().entryEvents());
+
+ var watchEvt2Res = ArgumentCaptor.forClass(WatchEvent.class);
+ verify(lsnr2).onUpdate(watchEvt2Res.capture());
+ assertEquals(List.of(entryEvt2), watchEvt2Res.getValue().entryEvents());
}
@Test
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 02ca745..c2710f8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -202,7 +202,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
UUID tblId = new UUID(revision, 0L);
if (hasMetastorageLocally) {
- var key = new ByteArray(INTERNAL_PREFIX + tblId.);
+ var key = new ByteArray(INTERNAL_PREFIX + tblId);
futs.add(metaStorageMgr.invoke(
Conditions.exists(key),
diff --git a/parent/pom.xml b/parent/pom.xml
index 3f037e7..3cc852d 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -176,6 +176,12 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-metastorage-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-network</artifactId>
<version>${project.version}</version>
</dependency>