You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2021/05/19 13:18:34 UTC
[ignite-3] branch ignite-14389 updated: IGNITE-14389 Code review
fixes
This is an automated email from the ASF dual-hosted git repository.
agura pushed a commit to branch ignite-14389
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-14389 by this push:
new e5056f3 IGNITE-14389 Code review fixes
e5056f3 is described below
commit e5056f3f124eee7f9496d8f8794f26ae0ca10445
Author: Andrey Gura <ag...@apache.org>
AuthorDate: Wed May 19 16:18:11 2021 +0300
IGNITE-14389 Code review fixes
---
.../client/ITMetaStorageServiceTest.java | 6 +-
.../internal/metastorage/client/CursorImpl.java | 15 +--
.../internal/metastorage/client/EntryImpl.java | 44 ++++-----
.../metastorage/client/MetaStorageServiceImpl.java | 27 ++++--
.../apache/ignite/metastorage/client/Entry.java | 14 +++
.../metastorage/client/MetaStorageService.java | 16 +--
.../ignite/metastorage/client/WatchEvent.java | 29 +++++-
.../metastorage/common/command/GetAllCommand.java | 6 +-
.../metastorage/common/command/InvokeCommand.java | 5 +-
.../common/command/WatchExactKeysCommand.java | 4 +-
.../ignite/internal/metastorage/server/Entry.java | 7 +-
.../server/SimpleInMemoryKeyValueStorage.java | 107 +++++++++++++--------
.../internal/metastorage/server/WatchEvent.java | 21 ++++
.../server/SimpleInMemoryKeyValueStorageTest.java | 6 +-
.../internal/metastorage/MetaStorageManager.java | 14 +--
.../internal/metastorage/WatchAggregatorTest.java | 8 ++
.../internal/table/distributed/TableManager.java | 4 +-
17 files changed, 219 insertions(+), 114 deletions(-)
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index ad626fa..2ffdd30 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -97,7 +97,7 @@ public class ITMetaStorageServiceTest {
public static final int LATEST_REVISION = -1;
/** Factory. */
- private static RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
+ private static final RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
/** Network factory. */
private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
@@ -240,7 +240,7 @@ public class ITMetaStorageServiceTest {
}
/**
- * Tests {@link MetaStorageService#getAll(Collection)}.
+ * Tests {@link MetaStorageService#getAll(Set)}.
*
* @throws Exception If failed.
*/
@@ -257,7 +257,7 @@ public class ITMetaStorageServiceTest {
}
/**
- * Tests {@link MetaStorageService#getAll(Collection, long)}.
+ * Tests {@link MetaStorageService#getAll(Set, long)}.
*
* @throws Exception If failed.
*/
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
index cee0934..dbbb2aa 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
@@ -58,7 +58,7 @@ public class CursorImpl<T> implements Cursor<T> {
CursorImpl(RaftGroupService metaStorageRaftGrpSvc, CompletableFuture<IgniteUuid> initOp, Function<Object, T> fn) {
this.metaStorageRaftGrpSvc = metaStorageRaftGrpSvc;
this.initOp = initOp;
- this.it = new InnerIterator<>();
+ this.it = new InnerIterator();
this.fn = fn;
}
@@ -80,17 +80,18 @@ public class CursorImpl<T> implements Cursor<T> {
}
}
- @Override
- public boolean hasNext() {
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
return it.hasNext();
}
- @Override
- public T next() {
+ /** {@inheritDoc} */
+ @Override public T next() {
return it.next();
}
- private class InnerIterator<T> implements Iterator<T> {
+ /** */
+ private class InnerIterator implements Iterator<T> {
/** {@inheritDoc} */
@Override public boolean hasNext() {
try {
@@ -107,7 +108,7 @@ public class CursorImpl<T> implements Cursor<T> {
/** {@inheritDoc} */
@Override public T next() {
try {
- return (T)initOp.thenCompose(
+ return initOp.thenCompose(
cursorId -> metaStorageRaftGrpSvc.run(new CursorNextCommand(cursorId))).thenApply(fn).get();
}
catch (InterruptedException | ExecutionException e) {
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
index c6c1e87..94164a0 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/EntryImpl.java
@@ -28,9 +28,11 @@ import org.jetbrains.annotations.Nullable;
*/
public final class EntryImpl implements Entry {
/** Key. */
+ @NotNull
private final ByteArray key;
/** Value. */
+ @Nullable
private final byte[] val;
/** Revision. */
@@ -47,50 +49,46 @@ public final class EntryImpl implements Entry {
* @param rev Revision.
* @param updCntr Update counter.
*/
- EntryImpl(ByteArray key, byte[] val, long rev, long updCntr) {
+ EntryImpl(@NotNull ByteArray key, @Nullable byte[] val, long rev, long updCntr) {
this.key = key;
this.val = val;
this.rev = rev;
this.updCntr = updCntr;
}
- /**
- * Returns key.
- *
- * @return Key.
- */
- @Override public @NotNull ByteArray key() {
+ /** {@inheritDoc} */ @NotNull
+ @Override public ByteArray key() {
return key;
}
- /**
- * Returns value.
- *
- * @return Value.
- */
- @Override public @Nullable byte[] value() {
+ /** {@inheritDoc} */
+ @Nullable
+ @Override public byte[] value() {
return val;
}
- /**
- * Returns revision.
- *
- * @return Revision.
- */
+ /** {@inheritDoc} */
@Override public long revision() {
return rev;
}
- /**
- * Returns update counter.
- *
- * @return Update counter.
- */
+ /** {@inheritDoc} */
@Override public long updateCounter() {
return updCntr;
}
/** {@inheritDoc} */
+ @Override public boolean tombstone() {
+ return val == null && rev > 0 && updCntr > 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean empty() {
+ return val == null && rev == 0 && updCntr == 0;
+ }
+
+
+ /** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
index bbae5fe..3d7bde9 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
@@ -50,7 +50,13 @@ import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.metastorage.client.*;
+import org.apache.ignite.metastorage.client.Condition;
+import org.apache.ignite.metastorage.client.Entry;
+import org.apache.ignite.metastorage.client.EntryEvent;
+import org.apache.ignite.metastorage.client.MetaStorageService;
+import org.apache.ignite.metastorage.client.Operation;
+import org.apache.ignite.metastorage.client.WatchEvent;
+import org.apache.ignite.metastorage.client.WatchListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -79,11 +85,7 @@ public class MetaStorageServiceImpl implements MetaStorageService {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Entry> get(@NotNull ByteArray key) {
- return metaStorageRaftGrpSvc.run(new GetCommand(key)).thenApply(obj -> {
- SingleEntryResponse resp = (SingleEntryResponse)obj;
-
- return new EntryImpl(new ByteArray(resp.key()), resp.value(), resp.revision(), resp.updateCounter());
- });
+ return metaStorageRaftGrpSvc.run(new GetCommand(key)).thenApply(MetaStorageServiceImpl::singleEntryResult);
}
/** {@inheritDoc} */
@@ -93,13 +95,13 @@ public class MetaStorageServiceImpl implements MetaStorageService {
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Collection<ByteArray> keys) {
+ @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) {
return metaStorageRaftGrpSvc.run(new GetAllCommand(keys))
.thenApply(MetaStorageServiceImpl::multipleEntryResult);
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Collection<ByteArray> keys, long revUpperBound) {
+ @Override public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
return metaStorageRaftGrpSvc.run(new GetAllCommand(keys, revUpperBound)).
thenApply(MetaStorageServiceImpl::multipleEntryResult);
}
@@ -221,7 +223,7 @@ public class MetaStorageServiceImpl implements MetaStorageService {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<IgniteUuid> watch(
- @NotNull Collection<ByteArray> keys,
+ @NotNull Set<ByteArray> keys,
long revision,
@NotNull WatchListener lsnr
) {
@@ -266,6 +268,8 @@ public class MetaStorageServiceImpl implements MetaStorageService {
info = new OperationInfo(inner.key(), inner.value(), OperationType.PUT);
}
+ else
+ assert false : "Unknown operation type " + op.type();
res.add(info);
}
@@ -294,10 +298,13 @@ public class MetaStorageServiceImpl implements MetaStorageService {
cnd = new ConditionInfo(inner.key(), inner.type(), inner.value(), 0);
}
+ else
+ assert false : "Unknown condition type: " + obj.getClass().getSimpleName();
return cnd;
}
+ /** */
private static Map<ByteArray, Entry> multipleEntryResult(Object obj) {
MultipleEntryResponse resp = (MultipleEntryResponse) obj;
@@ -312,12 +319,14 @@ public class MetaStorageServiceImpl implements MetaStorageService {
return res;
}
+ /** */
private static Entry singleEntryResult(Object obj) {
SingleEntryResponse resp = (SingleEntryResponse) obj;
return new EntryImpl(new ByteArray(resp.key()), resp.value(), resp.revision(), resp.updateCounter());
}
+ /** */
private static WatchEvent watchResponse(Object obj) {
MultipleEntryResponse resp = (MultipleEntryResponse) obj;
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Entry.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Entry.java
index 6e8832a..8895aac 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Entry.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/Entry.java
@@ -57,4 +57,18 @@ public interface Entry {
* @return Update counter.
*/
long updateCounter();
+
+ /**
+ * Returns value which denotes whether entry is empty or not.
+ *
+ * @return {@code True} if entry is empty, otherwise - {@code false}.
+ */
+ boolean empty();
+
+ /**
+ * Returns value which denotes whether entry is tombstone or not.
+ *
+ * @return {@code True} if entry is tombstone, otherwise - {@code false}.
+ */
+ boolean tombstone();
}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
index 35a7af8..787a463 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java
@@ -62,21 +62,21 @@ public interface MetaStorageService {
/**
* Retrieves entries for given keys.
*
- * @param keys The collection of keys. Couldn't be {@code null} or empty.
- * Collection elements couldn't be {@code null}.
+ * @param keys The set of keys. Couldn't be {@code null} or empty.
+ * Set elements couldn't be {@code null}.
* @return A map of entries for given keys. Couldn't be {@code null}.
* @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result.
* @see ByteArray
* @see Entry
*/
@NotNull
- CompletableFuture<Map<ByteArray, Entry>> getAll(Collection<ByteArray> keys);
+ CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys);
/**
* Retrieves entries for given keys and the revision upper bound.
*
- * @param keys The collection of keys. Couldn't be {@code null} or empty.
- * Collection elements couldn't be {@code null}.
+ * @param keys The set of keys. Couldn't be {@code null} or empty.
+ * Set elements couldn't be {@code null}.
* @param revUpperBound The upper bound for entry revisions. Must be positive.
* @return A map of entries for given keys and maximum revision limited by {@code revUpperBound}.
* Couldn't be {@code null}.
@@ -87,7 +87,7 @@ public interface MetaStorageService {
* @see Entry
*/
@NotNull
- CompletableFuture<Map<ByteArray, Entry>> getAll(Collection<ByteArray> keys, long revUpperBound);
+ CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound);
/**
* Inserts or updates an entry with the given key and the given value.
@@ -301,7 +301,7 @@ public interface MetaStorageService {
/**
* Subscribes on meta storage updates for given keys.
*
- * @param keys Collection of target keys. Could be {@code null}.
+ * @param keys Set of target keys. Could be {@code null}.
* @param revision Start revision inclusive. {@code 0} - all revision,
* {@code -1} - latest revision (accordingly to current meta storage state).
* @param lsnr Listener which will be notified for each update.
@@ -313,7 +313,7 @@ public interface MetaStorageService {
* @see Entry
*/
@NotNull
- CompletableFuture<IgniteUuid> watch(@NotNull Collection<ByteArray> keys, long revision, @NotNull WatchListener lsnr);
+ CompletableFuture<IgniteUuid> watch(@NotNull Set<ByteArray> keys, long revision, @NotNull WatchListener lsnr);
/**
* Cancels subscription for the given identifier.
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/WatchEvent.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/WatchEvent.java
index 1ddac71..147ff63 100644
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/WatchEvent.java
+++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/WatchEvent.java
@@ -17,14 +17,19 @@
package org.apache.ignite.metastorage.client;
-import org.jetbrains.annotations.NotNull;
-
import java.util.Collection;
import java.util.List;
+import org.jetbrains.annotations.NotNull;
+/**
+ * Watch event contains all entry updates done under one revision. Each particular entry update in this revision
+ * is represented by {@link EntryEvent} entity.
+ */
public class WatchEvent {
+ /** Events about each entry update in the revision. */
private final List<EntryEvent> entryEvts;
+ /** Designates that watch event contains only one update revision. */
private final boolean single;
/**
@@ -39,18 +44,38 @@ public class WatchEvent {
this.entryEvts = entryEvts;
}
+ /**
+ * Constructs watch event with single entry update.
+ *
+ * @param entryEvt Entry event.
+ */
public WatchEvent(@NotNull EntryEvent entryEvt) {
this(List.of(entryEvt));
}
+ /**
+ * Returns {@code true} if watch event contains only one entry event.
+ *
+ * @return {@code True} if watch event contains only one entry event.
+ */
public boolean single() {
return single;
}
+ /**
+ * Returns collection of entry entry event done under one revision.
+ *
+ * @return Collection of entry entry event done under one revision.
+ */
public Collection<EntryEvent> entryEvents() {
return entryEvts;
}
+ /**
+ * Returns entry event. It is useful method in case when we know that only one event was modified.
+ *
+ * @return Entry event.
+ */
public EntryEvent entryEvent() {
return entryEvts.get(0);
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
index e20a793..c129931 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
@@ -18,8 +18,8 @@
package org.apache.ignite.internal.metastorage.common.command;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
+import java.util.Set;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.ReadCommand;
import org.jetbrains.annotations.NotNull;
@@ -39,7 +39,7 @@ public final class GetAllCommand implements ReadCommand {
* @param keys The collection of keys. Couldn't be {@code null} or empty. Collection elements couldn't be {@code
* null}.
*/
- public GetAllCommand(@NotNull Collection<ByteArray> keys) {
+ public GetAllCommand(@NotNull Set<ByteArray> keys) {
assert !keys.isEmpty();
this.keys = new ArrayList<>(keys.size());
@@ -53,7 +53,7 @@ public final class GetAllCommand implements ReadCommand {
* null}.
* @param revUpperBound The upper bound for entry revisions. Must be positive.
*/
- public GetAllCommand(@NotNull Collection<ByteArray> keys, long revUpperBound) {
+ public GetAllCommand(@NotNull Set<ByteArray> keys, long revUpperBound) {
this(keys);
assert revUpperBound > 0;
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
index 6981a0e..71ef3f0 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/InvokeCommand.java
@@ -17,14 +17,13 @@
package org.apache.ignite.internal.metastorage.common.command;
-import org.apache.ignite.raft.client.Command;
-
import java.util.List;
+import org.apache.ignite.raft.client.WriteCommand;
/**
* Represents invoke command for meta storage.
*/
-public class InvokeCommand implements Command {
+public class InvokeCommand implements WriteCommand {
/** Condition. */
private final ConditionInfo cond;
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
index efde780..87635a3 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
@@ -18,8 +18,8 @@
package org.apache.ignite.internal.metastorage.common.command;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
+import java.util.Set;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -38,7 +38,7 @@ public final class WatchExactKeysCommand implements WriteCommand {
* @param keys The keys collection. Couldn't be {@code null}.
* @param revision Start revision inclusive. {@code 0} - all revisions.
*/
- public WatchExactKeysCommand(@NotNull Collection<ByteArray> keys, long revision) {
+ public WatchExactKeysCommand(@NotNull Set<ByteArray> keys, long revision) {
this.keys = new ArrayList<>(keys.size());
for (ByteArray key : keys)
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
index ede6ce2..0b7670b 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Entry.java
@@ -25,7 +25,9 @@ import org.jetbrains.annotations.Nullable;
* <ul>
* <li>key - an unique entry's key represented by an array of bytes. Keys are comparable in lexicographic manner.</li>
* <li>value - a data which is associated with a key and represented as an array of bytes.</li>
- * <li>revision - a number which denotes a version of whole meta storage. Each change increments the revision.</li>
+ * <li>revision - a number which denotes a version of whole meta storage.
+ * Each change (which could include multiple entries) increments the revision. </li>
+ * <li>updateCounter - a number which increments on every update in the change under one revision.</li>
* </ul>
*
* Instance of {@link #Entry} could represents:
@@ -125,6 +127,7 @@ public class Entry {
@NotNull
public static Entry tombstone(byte[] key, long rev, long updCntr) {
assert rev > 0 : "rev must be positive for tombstone entry.";
+ assert updCntr > 0 : "updCntr must be positive for tombstone entry.";
return new Entry(key, rev, updCntr);
}
@@ -151,6 +154,7 @@ public class Entry {
/**
* Returns a revision.
+ *
* @return Revision.
*/
public long revision() {
@@ -159,6 +163,7 @@ public class Entry {
/**
* Returns a update counter.
+ *
* @return Update counter.
*/
public long updateCounter() {
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
index 6595380..271b8df 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java
@@ -31,36 +31,50 @@ import java.util.TreeSet;
import java.util.function.Predicate;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.TestOnly;
import static org.apache.ignite.internal.metastorage.server.Value.TOMBSTONE;
/**
- * WARNING: Only for test purposes and only for non-distributed (one static instance) storage.
+ * Simple in-memory key/value storage.
+ *
+ * WARNING: Only for test purposes.
*/
public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
+ /** Lexicographical comparator. */
private static final Comparator<byte[]> CMP = Arrays::compare;
+ /**
+ * Special value for revision number which means that operation should be applied
+ * to the latest revision of an entry.
+ */
private static final long LATEST_REV = -1;
+ /** Keys index. Value is the list of all revisions under which entry corresponding to the key was modified. */
private NavigableMap<byte[], List<Long>> keysIdx = new TreeMap<>(CMP);
+ /** Revisions index. Value contains all entries which were modified under particular revision. */
private NavigableMap<Long, NavigableMap<byte[], Value>> revsIdx = new TreeMap<>();
+ /** Revision. Will be incremented for each single-entry or multi-entry update operation. */
private long rev;
+ /** Update counter. Will be incremented for each update of any particular entry. */
private long updCntr;
+ /** All operations are queued on this lock. */
private final Object mux = new Object();
+ /** {@inheritDoc} */
@Override public long revision() {
return rev;
}
+ /** {@inheritDoc} */
@Override public long updateCounter() {
return updCntr;
}
+ /** {@inheritDoc} */
@Override public void put(byte[] key, byte[] value) {
synchronized (mux) {
long curRev = rev + 1;
@@ -71,6 +85,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
+ /** {@inheritDoc} */
@NotNull
@Override public Entry getAndPut(byte[] key, byte[] bytes) {
synchronized (mux) {
@@ -85,8 +100,8 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
- @Override
- public void putAll(List<byte[]> keys, List<byte[]> values) {
+ /** {@inheritDoc} */
+ @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
synchronized (mux) {
long curRev = rev + 1;
@@ -94,8 +109,9 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
- @Override
- public @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
Collection<Entry> res;
synchronized (mux) {
@@ -109,6 +125,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return res;
}
+ /** {@inheritDoc} */
@NotNull
@Override public Entry get(byte[] key) {
synchronized (mux) {
@@ -116,26 +133,28 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
+ /** {@inheritDoc} */
@NotNull
- @TestOnly
@Override public Entry get(byte[] key, long rev) {
synchronized (mux) {
return doGet(key, rev, true);
}
}
- @Override
- public @NotNull Collection<Entry> getAll(List<byte[]> keys) {
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAll(List<byte[]> keys) {
return doGetAll(keys, LATEST_REV);
}
- @Override
- public @NotNull Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
return doGetAll(keys, revUpperBound);
}
- @Override
- public void remove(byte[] key) {
+ /** {@inheritDoc} */
+ @Override public void remove(byte[] key) {
synchronized (mux) {
long curRev = rev + 1;
@@ -144,17 +163,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
- private boolean doRemove(byte[] key, long curRev) {
- Entry e = doGet(key, LATEST_REV, false);
-
- if (e.empty() || e.tombstone())
- return false;
-
- doPut(key, TOMBSTONE, curRev);
-
- return true;
- }
-
+ /** {@inheritDoc} */
@NotNull
@Override public Entry getAndRemove(byte[] key) {
synchronized (mux) {
@@ -167,8 +176,8 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
- @Override
- public void removeAll(List<byte[]> keys) {
+ /** {@inheritDoc} */
+ @Override public void removeAll(List<byte[]> keys) {
synchronized (mux) {
long curRev = rev + 1;
@@ -191,8 +200,9 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
- @Override
- public @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
+ /** {@inheritDoc} */
+ @NotNull
+ @Override public Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
Collection<Entry> res = new ArrayList<>(keys.size());
synchronized (mux) {
@@ -221,6 +231,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return res;
}
+ /** {@inheritDoc} */
@Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
synchronized (mux) {
Entry e = get(condition.key());
@@ -262,14 +273,17 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
+ /** {@inheritDoc} */
@Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
return new RangeCursor(keyFrom, keyTo, rev);
}
+ /** {@inheritDoc} */
@Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
return new RangeCursor(keyFrom, keyTo, revUpperBound);
}
+ /** {@inheritDoc} */
@Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
assert keyFrom != null;
assert rev > 0;
@@ -279,6 +293,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
);
}
+ /** {@inheritDoc} */
@Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
assert key != null;
assert rev > 0;
@@ -286,6 +301,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return new WatchCursor(rev, k -> CMP.compare(k, key) == 0);
}
+ /** {@inheritDoc} */
@Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
assert keys != null && !keys.isEmpty();
assert rev > 0;
@@ -297,6 +313,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return new WatchCursor(rev, keySet::contains);
}
+ /** {@inheritDoc} */
@Override public void compact() {
synchronized (mux) {
NavigableMap<byte[], List<Long>> compactedKeysIdx = new TreeMap<>(CMP);
@@ -311,6 +328,19 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
+ /** */
+ private boolean doRemove(byte[] key, long curRev) {
+ Entry e = doGet(key, LATEST_REV, false);
+
+ if (e.empty() || e.tombstone())
+ return false;
+
+ doPut(key, TOMBSTONE, curRev);
+
+ return true;
+ }
+
+ /** */
private void compactForKey(
byte[] key,
List<Long> revs,
@@ -353,13 +383,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return res;
}
- /**
- * Returns entry for given key.
- *
- * @param key Key.
- * @param rev Revision.
- * @return Entry for given key.
- */
+ /** */
@NotNull
private Entry doGet(byte[] key, long rev, boolean exactRev) {
assert rev == LATEST_REV && !exactRev || rev > LATEST_REV :
@@ -405,6 +429,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return -1;
}
+ /** */
@NotNull
private Entry doGetValue(byte[] key, long lastRev) {
if (lastRev == 0)
@@ -423,6 +448,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return new Entry(key, lastVal.bytes(), lastRev, lastVal.updateCounter());
}
+ /** */
private long doPut(byte[] key, byte[] bytes, long curRev) {
long curUpdCntr = ++updCntr;
@@ -434,14 +460,8 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
revs.add(curRev);
// Update revsIdx.
- //NavigableMap<byte[], Value> entries = new TreeMap<>(CMP);
-
Value val = new Value(bytes, curUpdCntr);
- //entries.put(key, val);
-
- //revsIdx.put(curRev, entries);
-
revsIdx.compute(
curRev,
(rev, entries) -> {
@@ -457,6 +477,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
return lastRev;
}
+ /** */
private long doPutAll(long curRev, List<byte[]> keys, List<byte[]> bytesList) {
synchronized (mux) {
// Update revsIdx.
@@ -556,6 +577,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
@NotNull
Iterator<Entry> createIterator() {
return new Iterator<>() {
+ /** {@inheritDoc} */
@Override public boolean hasNext() {
synchronized (mux) {
while (true) {
@@ -607,6 +629,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
+ /** {@inheritDoc} */
@Override public Entry next() {
synchronized (mux) {
while (true) {
@@ -675,6 +698,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
@NotNull
Iterator<WatchEvent> createIterator() {
return new Iterator<>() {
+ /** {@inheritDoc} */
@Override public boolean hasNext() {
synchronized (mux) {
if (nextRetRev != -1)
@@ -701,6 +725,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage {
}
}
+ /** {@inheritDoc} */
@Override public WatchEvent next() {
synchronized (mux) {
while (true) {
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
index 4f2f305..24b1e98 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/WatchEvent.java
@@ -20,9 +20,15 @@ package org.apache.ignite.internal.metastorage.server;
import java.util.Collection;
import java.util.List;
+/**
+ * Watch event contains all entry updates done under one revision. Each particular entry update in this revision
+ * is represented by {@link EntryEvent} entity.
+ */
public class WatchEvent {
+ /** Events about each entry update in the revision. */
private final List<EntryEvent> entryEvts;
+ /** Designates that watch event contains only one update revision. */
private final boolean single;
/**
@@ -37,14 +43,29 @@ public class WatchEvent {
this.entryEvts = entryEvts;
}
+ /**
+ * Returns {@code true} if watch event contains only one entry event.
+ *
+ * @return {@code True} if watch event contains only one entry event.
+ */
public boolean single() {
return single;
}
+ /**
+ * Returns collection of entry entry event done under one revision.
+ *
+ * @return Collection of entry entry event done under one revision.
+ */
public Collection<EntryEvent> entryEvents() {
return entryEvts;
}
+ /**
+ * Returns entry event. It is useful method in case when we know that only one event was modified.
+ *
+ * @return Entry event.
+ */
public EntryEvent entryEvent() {
return entryEvts.get(0);
}
diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
index 595f40a..342b4ba 100644
--- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
+++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java
@@ -41,6 +41,7 @@ import static org.junit.jupiter.api.Assertions.fail;
* Tests for in-memory meta storage implementation.
*/
class SimpleInMemoryKeyValueStorageTest {
+ /** */
private KeyValueStorage storage;
@BeforeEach
@@ -1926,16 +1927,19 @@ class SimpleInMemoryKeyValueStorageTest {
assertFalse(it.hasNext());
}
+ /** */
private static void fill(KeyValueStorage storage, int keySuffix, int num) {
for (int i = 0; i < num; i++)
storage.getAndPut(k(keySuffix), kv(keySuffix, i + 1));
}
+ /** */
private static byte[] k(int k) {
return ("key" + k).getBytes();
}
+ /** */
private static byte[] kv(int k, int v) {
return ("key" + k + '_' + "val" + v).getBytes();
}
-}
\ No newline at end of file
+}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 2249268..acbbfd5 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -284,16 +284,16 @@ public class MetaStorageManager {
}
/**
- * @see MetaStorageService#getAll(Collection)
+ * @see MetaStorageService#getAll(Set)
*/
- public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Collection<ByteArray> keys) {
+ public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
}
/**
- * @see MetaStorageService#getAll(Collection, long)
+ * @see MetaStorageService#getAll(Set, long)
*/
- public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Collection<ByteArray> keys, long revUpperBound) {
+ public @NotNull CompletableFuture<Map<ByteArray, Entry>> getAll(Set<ByteArray> keys, long revUpperBound) {
return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound));
}
@@ -476,11 +476,7 @@ public class MetaStorageManager {
private CompletableFuture<Void> storeEntries(Collection<IgniteBiTuple<ByteArray, byte[]>> entries, long revision) {
try {
return vaultMgr.putAll(entries.stream().collect(
- Collectors.toMap(
- e -> ByteArray.fromString(e.getKey().toString()),
- IgniteBiTuple::getValue)
- ),
- revision);
+ Collectors.toMap(e -> e.getKey(), IgniteBiTuple::getValue)), revision);
}
catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException("Couldn't put entries with considered revision.", e);
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
index 5482f87..2c48c78 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
@@ -148,6 +148,14 @@ public class WatchAggregatorTest {
@Override public long updateCounter() {
return updateCntr;
}
+
+ @Override public boolean empty() {
+ return false;
+ }
+
+ @Override public boolean tombstone() {
+ return false;
+ }
};
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 241c5a2..8ebe2cf 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -202,7 +202,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
var key = new ByteArray(INTERNAL_PREFIX + tblId);
futs.add(metaStorageMgr.invoke(
- Conditions.exists(key),
+ Conditions.notExists(key),
Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
Operations.noop()).thenCompose(res ->
affMgr.calculateAssignments(tblId)));
@@ -240,7 +240,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
UUID tblId = t.internalTable().tableId();
if (hasMetastorageLocally) {
- var key = new ByteArray(INTERNAL_PREFIX + tblId.toString());
+ var key = new ByteArray(INTERNAL_PREFIX + tblId);
futs.add(affMgr.removeAssignment(tblId).thenCompose(res ->
metaStorageMgr.invoke(Conditions.exists(key),