You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/05/11 19:06:55 UTC
[ignite-3] branch main updated: Revert "IGNITE-14664 Implemented
MetaStorageService and corresponding raft commands. Fixes #116"
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 66bb2ef Revert "IGNITE-14664 Implemented MetaStorageService and corresponding raft commands. Fixes #116"
66bb2ef is described below
commit 66bb2ef9ba6d8ae8745d588d17a0e0b1e1b23f4f
Author: Slava Koptilin <sl...@gmail.com>
AuthorDate: Tue May 11 22:06:10 2021 +0300
Revert "IGNITE-14664 Implemented MetaStorageService and corresponding raft commands. Fixes #116"
This reverts commit 22c060a18353a7861d474811bb4e501d99414792.
---
.../ignite/internal/affinity/AffinityManager.java | 24 +-
modules/metastorage-client/pom.xml | 29 -
.../metastorage/client/MetaStorageServiceTest.java | 1073 --------------------
.../internal/metastorage/client/CursorImpl.java | 99 --
.../metastorage/client/MetaStorageServiceImpl.java | 361 -------
modules/metastorage-common/pom.xml | 5 -
.../internal/metastorage/common/DummyEntry.java | 104 --
.../metastorage/common/command/GetAllCommand.java | 78 --
.../common/command/GetAndPutAllCommand.java | 62 --
.../common/command/GetAndPutCommand.java | 57 --
.../common/command/GetAndRemoveAllCommand.java | 53 -
.../common/command/GetAndRemoveCommand.java | 45 -
.../metastorage/common/command/GetCommand.java | 68 --
.../metastorage/common/command/PutAllCommand.java | 52 -
.../metastorage/common/command/PutCommand.java | 57 --
.../metastorage/common/command/RangeCommand.java | 83 --
.../common/command/RemoveAllCommand.java | 50 -
.../metastorage/common/command/RemoveCommand.java | 44 -
.../common/command/WatchExactKeysCommand.java | 63 --
.../common/command/WatchRangeKeysCommand.java | 81 --
.../common/command/cursor/CursorCloseCommand.java | 44 -
.../command/cursor/CursorHasNextCommand.java | 44 -
.../common/command/cursor/CursorNextCommand.java | 44 -
.../apache/ignite/metastorage/common/Entry.java | 7 -
.../org/apache/ignite/metastorage/common/Key.java | 3 +-
.../ignite/metastorage/common/KeyValueStorage.java | 91 --
.../metastorage/common/KeyValueStorageImpl.java | 167 ---
.../ignite/metastorage/common/WatchEvent.java | 36 +-
.../common/raft/MetaStorageCommandListener.java | 297 ------
.../internal/metastorage/MetaStorageManager.java | 219 +---
.../internal/metastorage/WatchAggregatorTest.java | 30 +-
.../java/org/apache/ignite/internal/raft/Loza.java | 12 -
.../ignite/internal/runner/app/IgnitionTest.java | 6 +-
.../apache/ignite/internal/app/IgnitionImpl.java | 113 ++-
.../internal/table/distributed/TableManager.java | 24 +-
.../distributed/storage/InternalTableImpl.java | 14 +-
36 files changed, 225 insertions(+), 3414 deletions(-)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index f184c9c..37690bf 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -87,7 +87,7 @@ public class AffinityManager {
configurationMgr.configurationRegistry().getConfiguration(ClusterConfiguration.KEY)
.metastorageNodes().listen(ctx -> {
if (ctx.newValue() != null) {
- if (MetaStorageManager.hasMetastorageLocally(localNodeName, ctx.newValue()))
+ if (hasMetastorageLocally(localNodeName, ctx.newValue()))
subscribeToAssignmentCalculation();
else
unsubscribeFromAssignmentCalculation();
@@ -98,11 +98,31 @@ public class AffinityManager {
String[] metastorageMembers = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
.metastorageNodes().value();
- if (MetaStorageManager.hasMetastorageLocally(localNodeName, metastorageMembers))
+ if (hasMetastorageLocally(localNodeName, metastorageMembers))
subscribeToAssignmentCalculation();
}
/**
+ * Checks whether the local node hosts Metastorage.
+ *
+ * @param localNodeName Local node uniq name.
+ * @param metastorageMembers Metastorage members names.
+ * @return True if the node has Metastorage, false otherwise.
+ */
+ private boolean hasMetastorageLocally(String localNodeName, String[] metastorageMembers) {
+ boolean isLocalNodeHasMetasorage = false;
+
+ for (String name : metastorageMembers) {
+ if (name.equals(localNodeName)) {
+ isLocalNodeHasMetasorage = true;
+
+ break;
+ }
+ }
+ return isLocalNodeHasMetasorage;
+ }
+
+ /**
* Subscribes to metastorage members update.
*/
private void subscribeToAssignmentCalculation() {
diff --git a/modules/metastorage-client/pom.xml b/modules/metastorage-client/pom.xml
index ccf9ac5..d2f44b8 100644
--- a/modules/metastorage-client/pom.xml
+++ b/modules/metastorage-client/pom.xml
@@ -35,36 +35,7 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-raft-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
<artifactId>ignite-metastorage-common</artifactId>
</dependency>
-
- <!-- Test dependencies -->
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-engine</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.junit.jupiter</groupId>
- <artifactId>junit-jupiter-params</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.mockito</groupId>
- <artifactId>mockito-junit-jupiter</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-raft</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceTest.java
deleted file mode 100644
index fe2b4c1..0000000
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceTest.java
+++ /dev/null
@@ -1,1073 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.client;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-import org.apache.ignite.internal.metastorage.common.DummyEntry;
-import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.metastorage.client.MetaStorageService;
-import org.apache.ignite.metastorage.common.CompactedException;
-import org.apache.ignite.metastorage.common.Condition;
-import org.apache.ignite.metastorage.common.Cursor;
-import org.apache.ignite.metastorage.common.Entry;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.metastorage.common.KeyValueStorage;
-import org.apache.ignite.metastorage.common.Operation;
-import org.apache.ignite.metastorage.common.OperationTimeoutException;
-import org.apache.ignite.metastorage.common.WatchEvent;
-import org.apache.ignite.metastorage.common.WatchListener;
-import org.apache.ignite.metastorage.common.raft.MetaStorageCommandListener;
-import org.apache.ignite.network.ClusterLocalConfiguration;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.ClusterServiceFactory;
-import org.apache.ignite.network.message.MessageSerializationRegistry;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
-import org.apache.ignite.raft.client.Peer;
-import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
-import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
-import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.raft.server.RaftServer;
-import org.apache.ignite.raft.server.impl.RaftServerImpl;
-import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-
-import static org.junit.jupiter.api.Assertions.assertArrayEquals;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Meta storage client tests.
- */
-@SuppressWarnings("WeakerAccess") public class MetaStorageServiceTest {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageServiceTest.class);
-
- /** Base network port. */
- private static final int NODE_PORT_BASE = 20_000;
-
- /** Nodes. */
- private static final int NODES = 2;
-
- /** */
- private static final String METASTORAGE_RAFT_GROUP_NAME = "METASTORAGE_RAFT_GROUP";
-
- /** */
- public static final int LATEST_REVISION = -1;
-
- /** Factory. */
- private static RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
-
- /** Network factory. */
- private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
-
- /** */
- // TODO: IGNITE-14088 Uncomment and use real serializer provider
- private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
-
- /** Expected result entry. */
- private static final Entry EXPECTED_RESULT_ENTRY =
- new DummyEntry(
- new Key(new byte[] {1}),
- new byte[] {2},
- 10,
- 2
- );
-
- /** Expected result map. */
- private static final NavigableMap<Key, Entry> EXPECTED_RESULT_MAP;
-
-
- /** Cluster. */
- private ArrayList<ClusterService> cluster = new ArrayList<>();
-
- /** Meta storage raft server. */
- private RaftServer metaStorageRaftSrv;
-
- static {
- EXPECTED_RESULT_MAP = new TreeMap<>();
-
- EXPECTED_RESULT_MAP.put(
- new Key(new byte[] {1}),
- new DummyEntry(
- new Key(new byte[] {1}),
- new byte[] {2},
- 10,
- 2
- )
- );
-
- EXPECTED_RESULT_MAP.put(
- new Key(new byte[] {3}),
- new DummyEntry(
- new Key(new byte[] {3}),
- new byte[] {4},
- 10,
- 3
- )
- );
- }
-
- /**
- * Run @{code} NODES cluster nodes.
- */
- @BeforeEach
- public void beforeTest() {
- for (int i = 0; i < NODES; i++) {
- cluster.add(
- startClusterNode(
- "node_" + i,
- NODE_PORT_BASE + i,
- IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES).boxed().
- map((port) -> "localhost:" + port).collect(Collectors.toList())));
- }
-
- for (ClusterService node : cluster)
- assertTrue(waitForTopology(node, NODES, 1000));
-
- LOG.info("Cluster started.");
- }
-
- /**
- * Shutdown raft server and stop all cluster nodes.
- *
- * @throws Exception If failed to shutdown raft server,
- */
- @AfterEach
- public void afterTest() throws Exception {
- metaStorageRaftSrv.shutdown();
-
- for (ClusterService node : cluster)
- node.shutdown();
- }
-
- /**
- * Tests {@link MetaStorageService#get(Key)}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testGet() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull Entry get(byte[] key) {
- return EXPECTED_RESULT_ENTRY;
- }
- });
-
- assertEquals(EXPECTED_RESULT_ENTRY, metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key()).get());
- }
-
- /**
- * Tests {@link MetaStorageService#get(Key, long)}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testGetWithUpperBoundRevision() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull Entry get(byte[] key, long rev) {
- return EXPECTED_RESULT_ENTRY;
- }
- });
-
- assertEquals(
- EXPECTED_RESULT_ENTRY,
- metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key(), EXPECTED_RESULT_ENTRY.revision()).get()
- );
- }
-
- /**
- * Tests {@link MetaStorageService#getAll(Collection)}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testGetAll() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull List<Entry> getAll(List<byte[]> keys) {
- return new ArrayList<>(EXPECTED_RESULT_MAP.values());
- }
- });
-
- assertEquals(
- EXPECTED_RESULT_MAP,
- metaStorageSvc.getAll(EXPECTED_RESULT_MAP.keySet()).get()
- );
- }
-
- /**
- * Tests {@link MetaStorageService#getAll(Collection, long)}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testGetAllWithUpperBoundRevision() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull List<Entry> getAll(List<byte[]> keys, long revUpperBound) {
- return new ArrayList<>(EXPECTED_RESULT_MAP.values());
- }
- });
-
- assertEquals(
- EXPECTED_RESULT_MAP,
- metaStorageSvc.getAll(EXPECTED_RESULT_MAP.keySet(), 10).get()
- );
- }
-
- /**
- * Tests {@link MetaStorageService#put(Key, byte[])}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testPut() throws Exception {
- Key expKey = new Key(new byte[]{1});
-
- byte[] expVal = new byte[]{2};
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @SuppressWarnings("JavaAbbreviationUsage") @Override public void put(byte[] key, byte[] value) {
- assertArrayEquals(expKey.bytes(), key);
-
- assertArrayEquals(expVal, value);
- }
- });
-
- metaStorageSvc.put(expKey, expVal).get();
- }
-
- /**
- * Tests {@link MetaStorageService#getAndPut(Key, byte[])}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testGetAndPut() throws Exception {
- byte[] expVal = new byte[]{2};
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @SuppressWarnings("JavaAbbreviationUsage") @Override public @NotNull Entry getAndPut(byte[] key, byte[] value) {
- assertArrayEquals(EXPECTED_RESULT_ENTRY.key().bytes(), key);
-
- assertArrayEquals(expVal, value);
-
- return EXPECTED_RESULT_ENTRY;
- }
- });
-
- assertEquals(
- EXPECTED_RESULT_ENTRY,
- metaStorageSvc.getAndPut(EXPECTED_RESULT_ENTRY.key(), expVal).get()
- );
- }
-
- /**
- * Tests {@link MetaStorageService#putAll(Map)}.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("ConstantConditions")
- @Test
- public void testPutAll() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
- // Assert keys equality.
- assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
-
- List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(Key::bytes).collect(Collectors.toList());
-
- for (int i = 0; i < expKeys.size(); i++)
- assertArrayEquals(expKeys.get(i), keys.get(i));
-
-
- // Assert values equality.
- assertEquals(EXPECTED_RESULT_MAP.values().size(), values.size());
-
- List<byte[]> expVals = EXPECTED_RESULT_MAP.values().stream().
- map(Entry::value).collect(Collectors.toList());
-
- for (int i = 0; i < expKeys.size(); i++)
- assertArrayEquals(expVals.get(i), values.get(i));
- }
- });
-
- metaStorageSvc.putAll(
- EXPECTED_RESULT_MAP.entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> e.getValue().value())
- )
- ).get();
- }
-
- /**
- * Tests {@link MetaStorageService#getAndPutAll(Map)}.
- *
- * @throws Exception If failed.
- */
- @SuppressWarnings("ConstantConditions")
- @Test
- public void testGetAndPutAll() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull List<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
- // Assert keys equality.
- assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
-
- List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(Key::bytes).collect(Collectors.toList());
-
- for (int i = 0; i < expKeys.size(); i++)
- assertArrayEquals(expKeys.get(i), keys.get(i));
-
- // Assert values equality.
- assertEquals(EXPECTED_RESULT_MAP.values().size(), values.size());
-
- List<byte[]> expVals = EXPECTED_RESULT_MAP.values().stream().
- map(Entry::value).collect(Collectors.toList());
-
- for (int i = 0; i < expKeys.size(); i++)
- assertArrayEquals(expVals.get(i), values.get(i));
-
- return new ArrayList<>(EXPECTED_RESULT_MAP.values());
- }
- });
-
- Map<Key, Entry> gotRes = metaStorageSvc.getAndPutAll(
- EXPECTED_RESULT_MAP.entrySet().stream()
- .collect(Collectors.toMap(
- Map.Entry::getKey,
- e -> e.getValue().value())
- )
- ).get();
-
- assertEquals(EXPECTED_RESULT_MAP, gotRes);
- }
-
- /**
- * Tests {@link MetaStorageService#remove(Key)}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRemove() throws Exception {
- Key expKey = new Key(new byte[]{1});
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public void remove(byte[] key) {
- assertArrayEquals(expKey.bytes(), key);
- }
- });
-
- metaStorageSvc.remove(expKey).get();
- }
-
- /**
- * Tests {@link MetaStorageService#getAndRemove(Key)}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testGetAndRemove() throws Exception {
- Entry expRes = new DummyEntry(
- new Key(new byte[]{1}),
- new byte[]{3},
- 10,
- 2
- );
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull Entry getAndRemove(byte[] key) {
- assertArrayEquals(expRes.key().bytes(), key);
-
- return expRes;
- }
- });
-
- assertEquals(expRes, metaStorageSvc.getAndRemove(expRes.key()).get());
- }
-
- /**
- * Tests {@link MetaStorageService#removeAll(Collection)}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRemoveAll() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public void removeAll(List<byte[]> keys) {
- assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
-
- List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(Key::bytes).collect(Collectors.toList());
-
- for (int i = 0; i < expKeys.size(); i++)
- assertArrayEquals(expKeys.get(i), keys.get(i));
- }
- });
-
- metaStorageSvc.removeAll(EXPECTED_RESULT_MAP.keySet()).get();
- }
-
- /**
- * Tests {@link MetaStorageService#getAndRemoveAll(Collection)}.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testGetAndRemoveAll() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull List<Entry> getAndRemoveAll(List<byte[]> keys) {
- // Assert keys equality.
- assertEquals(EXPECTED_RESULT_MAP.keySet().size(), keys.size());
-
- List<byte[]> expKeys = EXPECTED_RESULT_MAP.keySet().stream().
- map(Key::bytes).collect(Collectors.toList());
-
- for (int i = 0; i < expKeys.size(); i++)
- assertArrayEquals(expKeys.get(i), keys.get(i));
-
- return new ArrayList<>(EXPECTED_RESULT_MAP.values());
- }
- });
-
- Map<Key, Entry> gotRes = metaStorageSvc.getAndRemoveAll(EXPECTED_RESULT_MAP.keySet()).get();
-
- assertEquals(EXPECTED_RESULT_MAP, gotRes);
- }
-
-
- /**
- * Tests {@link MetaStorageService#range(Key, Key, long)}} with not null keyTo and explicit revUpperBound.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRangeWitKeyToAndUpperBound() throws Exception {
- Key expKeyFrom = new Key(new byte[]{1});
-
- Key expKeyTo = new Key(new byte[]{3});
-
- long expRevUpperBound = 10;
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- assertArrayEquals(expKeyFrom.bytes(), keyFrom);
-
- assertArrayEquals(expKeyTo.bytes(), keyTo);
-
- assertEquals(expRevUpperBound, revUpperBound);
-
- return new Cursor<>() {
- @Override public void close() throws Exception {
-
- }
-
- @NotNull @Override public Iterator<Entry> iterator() {
- return new Iterator<>() {
- @Override public boolean hasNext() {
- return false;
- }
-
- @Override public Entry next() {
- return null;
- }
- };
- }
- };
- }
- });
-
- metaStorageSvc.range(expKeyFrom, expKeyTo, expRevUpperBound).close();
- }
-
- /**
- * Tests {@link MetaStorageService#range(Key, Key, long)}} with not null keyTo.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRangeWitKeyTo() throws Exception {
- Key expKeyFrom = new Key(new byte[]{1});
-
- Key expKeyTo = new Key(new byte[]{3});
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- assertArrayEquals(expKeyFrom.bytes(), keyFrom);
-
- assertArrayEquals(expKeyTo.bytes(), keyTo);
-
- assertEquals(LATEST_REVISION, revUpperBound);
-
- return new Cursor<>() {
- @Override public void close() throws Exception {
-
- }
-
- @NotNull @Override public Iterator<Entry> iterator() {
- return new Iterator<>() {
- @Override public boolean hasNext() {
- return false;
- }
-
- @Override public Entry next() {
- return null;
- }
- };
- }
- };
- }
- });
-
- metaStorageSvc.range(expKeyFrom, expKeyTo).close();
- }
-
- /**
- * Tests {@link MetaStorageService#range(Key, Key, long)}} with null keyTo.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRangeWitNullAsKeyTo() throws Exception {
- Key expKeyFrom = new Key(new byte[]{1});
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- assertArrayEquals(expKeyFrom.bytes(), keyFrom);
-
- assertNull(keyTo);
-
- assertEquals(LATEST_REVISION, revUpperBound);
-
- return new Cursor<>() {
- @Override public void close() throws Exception {
-
- }
-
- @NotNull @Override public Iterator<Entry> iterator() {
- return new Iterator<>() {
- @Override public boolean hasNext() {
- return false;
- }
-
- @Override public Entry next() {
- return null;
- }
- };
- }
- };
- }
- });
-
- metaStorageSvc.range(expKeyFrom, null).close();
- }
-
- /**
- * Tests {@link MetaStorageService#range(Key, Key, long)}} hasNext.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRangeHasNext() throws Exception {
- Key expKeyFrom = new Key(new byte[]{1});
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- return new Cursor<>() {
- @Override public void close() throws Exception {
-
- }
-
- @NotNull @Override public Iterator<Entry> iterator() {
- return new Iterator<>() {
- @Override public boolean hasNext() {
- return true;
- }
-
- @Override public Entry next() {
- return null;
- }
- };
- }
- };
- }
- });
-
- Cursor<Entry> cursor = metaStorageSvc.range(expKeyFrom, null);
-
- assertTrue(cursor.iterator().hasNext());
- }
-
- /**
- * Tests {@link MetaStorageService#range(Key, Key, long)}} next.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRangeNext() throws Exception {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- return new Cursor<>() {
- @Override public void close() throws Exception {
-
- }
-
- @NotNull @Override public Iterator<Entry> iterator() {
- return new Iterator<>() {
- @Override public boolean hasNext() {
- return true;
- }
-
- @Override public Entry next() {
- return EXPECTED_RESULT_ENTRY;
- }
- };
- }
- };
- }
- });
-
- Cursor<Entry> cursor = metaStorageSvc.range(EXPECTED_RESULT_ENTRY.key(), null);
-
- assertEquals(EXPECTED_RESULT_ENTRY, (cursor.iterator().next()));
- }
-
- /**
- * Tests {@link MetaStorageService#range(Key, Key, long)}} close.
- *
- * @throws Exception If failed.
- */
- @Test
- public void testRangeClose() throws Exception {
- Key expKeyFrom = new Key(new byte[]{1});
-
- Cursor cursorMock = mock(Cursor.class);
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- return cursorMock;
- }
- });
-
- Cursor<Entry> cursor = metaStorageSvc.range(expKeyFrom, null);
-
- cursor.close();
-
- verify(cursorMock, times(1)).close();
- }
-
- @Test
- public void testWatchOnUpdate() throws Exception {
- List<WatchEvent> returnedWatchEvents = Arrays.asList(
- new WatchEvent(
- new DummyEntry(
- new Key(new byte[]{2}),
- new byte[]{20},
- 1,
- 1
- ),
- new DummyEntry(
- new Key(new byte[]{2}),
- new byte[]{21},
- 2,
- 4
- )
- ),
- new WatchEvent(
- new DummyEntry(
- new Key(new byte[] {3}),
- new byte[] {20},
- 1,
- 2
- ),
- new DummyEntry(
- new Key(new byte[] {3}),
- null,
- 2,
- 5
- )
- ),
- new WatchEvent(
- new DummyEntry(
- new Key(new byte[] {4}),
- new byte[] {20},
- 1,
- 3
- ),
- new DummyEntry(
- new Key(new byte[] {4}),
- null,
- 3,
- 6
- )
- )
- );
-
- Key keyFrom = new Key(new byte[]{1});
-
- Key keyTo = new Key(new byte[]{10});
-
- long rev = 2;
-
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
- return new Cursor<>() {
- AtomicInteger retirevedItemCnt = new AtomicInteger(0);
-
- @Override public void close() throws Exception {
- // No-op.
- }
-
- @NotNull @Override public Iterator<WatchEvent> iterator() {
- return new Iterator<WatchEvent>() {
- @Override public boolean hasNext() {
-
- return retirevedItemCnt.get() < returnedWatchEvents.size();
- }
-
- @Override public WatchEvent next() {
- return returnedWatchEvents.get(retirevedItemCnt.getAndIncrement());
- }
- };
- }
- };
- }
- });
-
- CountDownLatch latch = new CountDownLatch(1);
-
- IgniteUuid watchId = metaStorageSvc.watch(keyFrom, keyTo, rev, new WatchListener() {
- @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
- List gotEvents = new ArrayList();
-
- Iterator<WatchEvent> iter = events.iterator();
-
- while (iter.hasNext())
- gotEvents.add(iter.next());
-
- assertEquals(2, gotEvents.size());
-
- assertTrue(gotEvents.contains(returnedWatchEvents.get(0)));
-
- assertTrue(gotEvents.contains(returnedWatchEvents.get(1)));
-
- latch.countDown();
- return true;
- }
-
- @Override public void onError(@NotNull Throwable e) {
- // Within given test it's not expected to get here.
- fail();
- }
- }).get();
-
- latch.await();
-
- metaStorageSvc.stopWatch(watchId).get();
- }
-
- // TODO: IGNITE-14693 Add tests for exception handling logic: onError,
- // TODO: (CompactedException | OperationTimeoutException)
-
-
- /**
- * Tests {@link MetaStorageService#get(Key)}.
- *
- * @throws Exception If failed.
- */
- @Disabled // TODO: IGNITE-14693 Add tests for exception handling logic.
- @Test
- public void testGetThatThrowsCompactedException() {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull Entry get(byte[] key) {
- throw new CompactedException();
- }
- });
-
- assertThrows(CompactedException.class, () -> metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key()).get());
- }
-
- /**
- * Tests {@link MetaStorageService#get(Key)}.
- *
- * @throws Exception If failed.
- */
- @Disabled // TODO: IGNITE-14693 Add tests for exception handling logic.
- @Test
- public void testGetThatThrowsOperationTimeoutException() {
- MetaStorageService metaStorageSvc = prepareMetaStorage(
- new AbstractKeyValueStorage() {
- @Override public @NotNull Entry get(byte[] key) {
- throw new OperationTimeoutException();
- }
- });
-
- assertThrows(OperationTimeoutException.class, () -> metaStorageSvc.get(EXPECTED_RESULT_ENTRY.key()).get());
- }
-
- /**
- * @param name Node name.
- * @param port Local port.
- * @param srvs Server nodes of the cluster.
- * @return The client cluster view.
- */
- private ClusterService startClusterNode(String name, int port, List<String> srvs) {
- var ctx = new ClusterLocalConfiguration(name, port, srvs, SERIALIZATION_REGISTRY);
-
- var net = NETWORK_FACTORY.createClusterService(ctx);
-
- net.start();
-
- return net;
- }
-
- /**
- * @param cluster The cluster.
- * @param exp Expected count.
- * @param timeout The timeout in millis.
- * @return {@code True} if topology size is equal to expected.
- */
- @SuppressWarnings("SameParameterValue")
- private boolean waitForTopology(ClusterService cluster, int exp, int timeout) {
- long stop = System.currentTimeMillis() + timeout;
-
- while (System.currentTimeMillis() < stop) {
- if (cluster.topologyService().allMembers().size() >= exp)
- return true;
-
- try {
- Thread.sleep(50);
- }
- catch (InterruptedException e) {
- return false;
- }
- }
-
- return false;
- }
-
- /**
- * Prepares metaStorage by instantiating corresponding raft server with {@link MetaStorageCommandListener} and
- * {@link MetaStorageServiceImpl}.
- *
- * @param keyValStorageMock {@link KeyValueStorage} mock.
- * @return {@link MetaStorageService} instance.
- */
- private MetaStorageService prepareMetaStorage(KeyValueStorage keyValStorageMock) {
- metaStorageRaftSrv = new RaftServerImpl(
- cluster.get(0),
- FACTORY,
- 1000,
- Map.of(METASTORAGE_RAFT_GROUP_NAME, new MetaStorageCommandListener(keyValStorageMock))
- );
-
- RaftGroupService metaStorageRaftGrpSvc = new RaftGroupServiceImpl(
- METASTORAGE_RAFT_GROUP_NAME,
- cluster.get(1),
- FACTORY,
- 10_000,
- List.of(new Peer(cluster.get(0).topologyService().localMember())),
- true,
- 200
- );
-
- return new MetaStorageServiceImpl(metaStorageRaftGrpSvc);
- }
-
- /**
- * Abstract {@link KeyValueStorage}. Used for tests purposes.
- */
- @SuppressWarnings({"ConstantConditions", "JavaAbbreviationUsage"})
- private abstract class AbstractKeyValueStorage implements KeyValueStorage {
- /** {@inheritDoc} */
- @Override public long revision() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public long updateCounter() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Entry get(byte[] key) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Entry get(byte[] key, long rev) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Collection<Entry> getAll(List<byte[]> keys) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void put(byte[] key, byte[] value) {
- fail();
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Entry getAndPut(byte[] key, byte[] value) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
- fail();
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void remove(byte[] key) {
- fail();
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Entry getAndRemove(byte[] key) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void removeAll(List<byte[]> keys) {
- fail();
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
- fail();
-
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
- fail();
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void compact() {
- fail();
- }
- }
-}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
deleted file mode 100644
index b507798..0000000
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/CursorImpl.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.client;
-
-import java.util.Iterator;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
-import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
-import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.metastorage.common.Cursor;
-import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Meta storage service side implementation of cursor.
- * @param <T> Cursor parameter.
- */
-public class CursorImpl<T> implements Cursor<T> {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(CursorImpl.class);
-
- /** Future that runs metastorage service operation that provides cursor. */
- private CompletableFuture<IgniteUuid> initOp;
-
- /** Meta storage raft group service. */
- private RaftGroupService metaStorageRaftGrpSvc;
-
- /**
- * @param metaStorageRaftGrpSvc Meta storage raft group service.
- * @param initOp Future that runs metastorage service operation that provides cursor.
- */
- CursorImpl(RaftGroupService metaStorageRaftGrpSvc, CompletableFuture<IgniteUuid> initOp) {
- this.metaStorageRaftGrpSvc = metaStorageRaftGrpSvc;
- this.initOp = initOp;
- }
-
- /** {@inheritDoc} */
- @NotNull @Override public Iterator<T> iterator() {
- return new Iterator<>() {
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- try {
- return initOp.thenCompose(
- cursorId -> metaStorageRaftGrpSvc.<Boolean>run(new CursorHasNextCommand(cursorId))).get();
- }
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Unable to evaluate cursor hasNext command", e);
-
- throw new IgniteInternalException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public T next() {
- try {
- return initOp.thenCompose(
- cursorId -> metaStorageRaftGrpSvc.<T>run(new CursorNextCommand(cursorId))).get();
- }
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Unable to evaluate cursor hasNext command", e);
-
- throw new IgniteInternalException(e);
- }
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public void close() {
- try {
- initOp.thenCompose(
- cursorId -> metaStorageRaftGrpSvc.run(new CursorCloseCommand(cursorId))).get();
- }
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Unable to evaluate cursor close command", e);
-
- throw new IgniteInternalException(e);
- }
- }
-}
diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java b/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
deleted file mode 100644
index a4af354..0000000
--- a/modules/metastorage-client/src/main/java/org/apache/ignite/internal/metastorage/client/MetaStorageServiceImpl.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.client;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetCommand;
-import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.PutCommand;
-import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
-import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
-import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
-import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.metastorage.client.MetaStorageService;
-import org.apache.ignite.metastorage.common.Condition;
-import org.apache.ignite.metastorage.common.Cursor;
-import org.apache.ignite.metastorage.common.Entry;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.metastorage.common.Operation;
-import org.apache.ignite.metastorage.common.WatchEvent;
-import org.apache.ignite.metastorage.common.WatchListener;
-import org.apache.ignite.raft.client.service.RaftGroupService;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * {@link MetaStorageService} implementation.
- */
-public class MetaStorageServiceImpl implements MetaStorageService {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageServiceImpl.class);
-
- /** Meta storage raft group service. */
- private final RaftGroupService metaStorageRaftGrpSvc;
-
- // TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
- /** Watch processor, that uses pulling logic in order to retrieve watch notifications from server. */
- private final WatchProcessor watchProcessor;
-
- /**
- * @param metaStorageRaftGrpSvc Meta storage raft group service.
- */
- public MetaStorageServiceImpl(RaftGroupService metaStorageRaftGrpSvc) {
- this.metaStorageRaftGrpSvc = metaStorageRaftGrpSvc;
- this.watchProcessor = new WatchProcessor();
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
- return metaStorageRaftGrpSvc.run(new GetCommand(key));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
- return metaStorageRaftGrpSvc.run(new GetCommand(key, revUpperBound));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
- return metaStorageRaftGrpSvc.<Collection<Entry>>run(new GetAllCommand(keys)).
- thenApply(entries -> entries.stream().collect(Collectors.toMap(Entry::key, Function.identity())));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
- return metaStorageRaftGrpSvc.<Collection<Entry>>run(new GetAllCommand(keys, revUpperBound)).
- thenApply(entries -> entries.stream().collect(Collectors.toMap(Entry::key, Function.identity())));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
- return metaStorageRaftGrpSvc.run(new PutCommand(key, value));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, @NotNull byte[] value) {
- return metaStorageRaftGrpSvc.run(new GetAndPutCommand(key, value));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
- return metaStorageRaftGrpSvc.run(new PutAllCommand(vals));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
- List<Key> keys = new ArrayList<>();
- List<byte[]> values = new ArrayList<>();
-
- vals.forEach((key, value) -> {
- keys.add(key);
- values.add(value);
- });
-
- return metaStorageRaftGrpSvc.<Collection<Entry>>run(new GetAndPutAllCommand(keys, values)).
- thenApply(entries -> entries.stream().collect(Collectors.toMap(Entry::key, Function.identity())));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
- return metaStorageRaftGrpSvc.run(new RemoveCommand(key));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
- return metaStorageRaftGrpSvc.run(new GetAndRemoveCommand(key));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
- return metaStorageRaftGrpSvc.run(new RemoveAllCommand(keys));
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
- return metaStorageRaftGrpSvc.<Collection<Entry>>run(new GetAndRemoveAllCommand(keys)).
- thenApply(entries -> entries.stream().collect(Collectors.toMap(Entry::key, Function.identity())));
- }
-
- // TODO: IGNITE-14389 Implement.
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition,
- @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure) {
- return null;
- }
-
- // TODO: IGNITE-14389 Either implement or remove this method.
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition,
- @NotNull Operation success, @NotNull Operation failure) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
- return new CursorImpl<>(
- metaStorageRaftGrpSvc,
- metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo, revUpperBound))
- );
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
- return new CursorImpl<>(
- metaStorageRaftGrpSvc,
- metaStorageRaftGrpSvc.run(new RangeCommand(keyFrom, keyTo))
- );
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<IgniteUuid> watch(
- @Nullable Key keyFrom,
- @Nullable Key keyTo,
- long revision,
- @NotNull WatchListener lsnr
- ) {
- CompletableFuture<IgniteUuid> watchRes =
- metaStorageRaftGrpSvc.run(new WatchRangeKeysCommand(keyFrom, keyTo, revision));
-
- watchRes.thenAccept(
- watchId -> watchProcessor.addWatch(
- watchId,
- new CursorImpl<>(metaStorageRaftGrpSvc, watchRes),
- lsnr
- )
- );
-
- return watchRes;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<IgniteUuid> watch(
- @NotNull Key key,
- long revision,
- @NotNull WatchListener lsnr
- ) {
- return watch(key, null, revision, lsnr);
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<IgniteUuid> watch(
- @NotNull Collection<Key> keys,
- long revision,
- @NotNull WatchListener lsnr
- ) {
- CompletableFuture<IgniteUuid> watchRes =
- metaStorageRaftGrpSvc.run(new WatchExactKeysCommand(keys, revision));
-
- watchRes.thenAccept(
- watchId -> watchProcessor.addWatch(
- watchId,
- new CursorImpl<>(metaStorageRaftGrpSvc, watchRes),
- lsnr
- )
- );
-
- return watchRes;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) {
- return CompletableFuture.runAsync(() -> watchProcessor.stopWatch(id));
- }
-
- // TODO: IGNITE-14389 Implement.
- /** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> compact() {
- return null;
- }
-
- // TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
- /** Watch processor, that manages {@link Watcher} threads. */
- private final class WatchProcessor {
- /** Active Watcher threads that process notification pulling logic. */
- private final Map<IgniteUuid, Watcher> watchers = new ConcurrentHashMap<>();
-
- /**
- * Starts exclusive thread per watch that implement watch pulling logic and
- * calls {@link WatchListener#onUpdate(Iterable)}} or {@link WatchListener#onError(Throwable)}.
- *
- * @param watchId Watch id.
- * @param cursor Watch Cursor.
- * @param lsnr The listener which receives and handles watch updates.
- */
- private void addWatch(IgniteUuid watchId, CursorImpl<WatchEvent> cursor, WatchListener lsnr) {
- Watcher watcher = new Watcher(cursor, lsnr);
-
- watchers.put(watchId, watcher);
-
- watcher.start();
- }
-
- /**
- * Closes server cursor and interrupts watch pulling thread.
- *
- * @param watchId Watch id.
- */
- private void stopWatch(IgniteUuid watchId) {
- watchers.computeIfPresent(
- watchId,
- (k, v) -> {
- CompletableFuture.runAsync(v::interrupt).thenRun(() -> {
- try {
- v.cursor.close();
- }
- catch (InterruptedException e) {
- throw new IgniteInternalException(e);
- }
- catch (Exception e) {
- // TODO: IGNITE-14693 Implement MetaStorage exception handling logic.
- LOG.error("Unexpected exception", e);
- }
- });
- return null;
- }
- );
- }
-
- /** Watcher thread, uses pulling logic in order to retrieve watch notifications from server */
- private final class Watcher extends Thread {
- /** Watch event cursor. */
- private Cursor<WatchEvent> cursor;
-
- /** The listener which receives and handles watch updates. */
- private WatchListener lsnr;
-
- /**
- * @param cursor Watch event cursor.
- * @param lsnr The listener which receives and handles watch updates.
- */
- Watcher(Cursor<WatchEvent> cursor, WatchListener lsnr) {
- this.cursor = cursor;
- this.lsnr = lsnr;
- }
-
- /**
- * Pulls watch events from server side with the help of cursor.iterator.hasNext()/next()
- * in the while(true) loop. Collects watch events with same revision and fires either onUpdate or onError().
- */
- @Override public void run() {
- long rev = -1;
-
- List<WatchEvent> sameRevisionEvts = new ArrayList<>();
-
- Iterator<WatchEvent> watchEvtsIter = cursor.iterator();
-
- while (true) {
- try {
- if (watchEvtsIter.hasNext()) {
- WatchEvent watchEvt = null;
-
- try {
- watchEvt = watchEvtsIter.next();
- }
- catch (Throwable e) {
- lsnr.onError(e);
- }
-
- assert watchEvt != null;
-
- if (watchEvt.newEntry().revision() == rev)
- sameRevisionEvts.add(watchEvt);
- else {
- rev = watchEvt.newEntry().revision();
-
- if (!sameRevisionEvts.isEmpty()) {
- lsnr.onUpdate(sameRevisionEvts);
-
- sameRevisionEvts.clear();
- }
-
- sameRevisionEvts.add(watchEvt);
- }
- }
- else
- Thread.sleep(10);
- }
- catch (Throwable e) {
- if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException)
- break;
- else {
- // TODO: IGNITE-14693 Implement MetaStorage exception handling logic.
- LOG.error("Unexpected exception", e);
- }
- }
- }
- }
- }
- }
-}
diff --git a/modules/metastorage-common/pom.xml b/modules/metastorage-common/pom.xml
index 3a02865..4f31b6c 100644
--- a/modules/metastorage-common/pom.xml
+++ b/modules/metastorage-common/pom.xml
@@ -35,11 +35,6 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
- <artifactId>ignite-raft-client</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
</dependency>
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/DummyEntry.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/DummyEntry.java
deleted file mode 100644
index c4e6ed8..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/DummyEntry.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import org.apache.ignite.metastorage.common.Entry;
-import org.apache.ignite.metastorage.common.Key;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-// TODO: IGNITE-14389 Tmp, used instead of real Entry implementation. Should be removed.
-/**
- * Dummy entry implementation.
- */
-public final class DummyEntry implements Entry, Serializable {
- /** Key. */
- @NotNull private Key key;
-
- /** Value. */
- @Nullable private byte[] val;
-
- /** Revision. */
- private long revision;
-
- /** Update counter. */
- private long updateCntr;
-
- /**
- *
- * @param key Key.
- * @param val Value.
- * @param revision Revision.
- * @param updateCntr Update counter.
- */
- public DummyEntry(@NotNull Key key, @Nullable byte[] val, long revision, long updateCntr) {
- this.key = key;
- this.val = val;
- this.revision = revision;
- this.updateCntr = updateCntr;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Key key() {
- return key;
- }
-
- /** {@inheritDoc} */
- @Override public @Nullable byte[] value() {
- return val;
- }
-
- /** {@inheritDoc} */
- @Override public long revision() {
- return revision;
- }
-
- /** {@inheritDoc} */
- @Override public long updateCounter() {
- return updateCntr;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- DummyEntry entry = (DummyEntry)o;
-
- if (revision != entry.revision)
- return false;
- if (updateCntr != entry.updateCntr)
- return false;
- if (!key.equals(entry.key))
- return false;
- return Arrays.equals(val, entry.val);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = key.hashCode();
- res = 31 * res + Arrays.hashCode(val);
- res = 31 * res + (int)(revision ^ (revision >>> 32));
- res = 31 * res + (int)(updateCntr ^ (updateCntr >>> 32));
- return res;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
deleted file mode 100644
index 853f7de..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAllCommand.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Get all command for MetaStorageCommandListener that retrieves entries for given keys and the revision upper bound, if
- * latter is present.
- */
-public final class GetAllCommand implements ReadCommand {
- /** The collection of keys. */
- @NotNull private final Collection<Key> keys;
-
- /** The upper bound for entry revisions. Must be positive. */
- @Nullable private Long revUpperBound;
-
- /**
- * @param keys The collection of keys. Couldn't be {@code null} or empty. Collection elements couldn't be {@code
- * null}.
- */
- public GetAllCommand(@NotNull Collection<Key> keys) {
- assert !keys.isEmpty();
-
- if (keys instanceof Serializable)
- this.keys = keys;
- else
- this.keys = new ArrayList<>(keys);
- }
-
- /**
- * @param keys The collection of keys. Couldn't be {@code null} or empty. Collection elements couldn't be {@code
- * null}.
- * @param revUpperBound The upper bound for entry revisions. Must be positive.
- */
- public GetAllCommand(@NotNull Collection<Key> keys, @NotNull Long revUpperBound) {
- this(keys);
-
- assert revUpperBound > 0;
-
- this.revUpperBound = revUpperBound;
- }
-
- /**
- * @return The collection of keys.
- */
- public @NotNull Collection<Key> keys() {
- return keys;
- }
-
- /**
- * @return The upper bound for entry revisions. Must be positive.
- */
- public @Nullable Long revision() {
- return revUpperBound;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
deleted file mode 100644
index 7fd3a6e..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutAllCommand.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Get and put all command for MetaStorageCommandListener that inserts or updates entries with given keys and given
- * values and retrieves a previous entries for given keys.
- */
-public final class GetAndPutAllCommand implements WriteCommand {
- /** Keys. */
- @NotNull private final List<Key> keys;
-
- /** Values. */
- @NotNull private final List<byte[]> vals;
-
- /**
- * @param keys Keys.
- * @param vals Values.
- */
- public GetAndPutAllCommand(@NotNull List<Key> keys, @NotNull List<byte[]> vals) {
- assert keys instanceof Serializable;
- assert vals instanceof Serializable;
-
- this.keys = keys;
- this.vals = vals;
- }
-
- /**
- * @return Keys.
- */
- public @NotNull List<Key> keys() {
- return keys;
- }
-
- /**
- * @return Values.
- */
- public @NotNull List<byte[]> vals() {
- return vals;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
deleted file mode 100644
index 202725e..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndPutCommand.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Get and put command for MetaStorageCommandListener that inserts or updates an entry with the given key and the
- * given value and retrieves a previous entry for the given key.
- */
-public final class GetAndPutCommand implements WriteCommand {
- /** The key. Couldn't be {@code null}. */
- @NotNull private final Key key;
-
- /** The value.Couldn't be {@code null}. */
- @NotNull private final byte[] val;
-
- /**
- * @param key The key. Couldn't be {@code null}.
- * @param val The value.Couldn't be {@code null}.
- */
- public GetAndPutCommand(@NotNull Key key, @NotNull byte[] val) {
- this.key = key;
- this.val = val;
- }
-
- /**
- * @return The key. Couldn't be .
- */
- public @NotNull Key key() {
- return key;
- }
-
- /**
- * @return The value.Couldn't be .
- */
- public @NotNull byte[] value() {
- return val;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
deleted file mode 100644
index 070428f..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveAllCommand.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Get and remove all command for MetaStorageCommandListener that removes entries for given keys and retrieves previous
- * entries.
- */
-public final class GetAndRemoveAllCommand implements WriteCommand {
- /** The keys collection. Couldn't be {@code null}. */
- @NotNull private final Collection<Key> keys;
-
- /**
- * @param keys The keys collection. Couldn't be {@code null}.
- */
- public GetAndRemoveAllCommand(@NotNull Collection<Key> keys) {
- assert !keys.isEmpty();
-
- if (keys instanceof Serializable)
- this.keys = keys;
- else
- this.keys = new ArrayList<>(keys);
- }
-
- /**
- * @return The keys collection. Couldn't be .
- */
- public @NotNull Collection<Key> keys() {
- return keys;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
deleted file mode 100644
index ccadd89..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetAndRemoveCommand.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Get and remove command for MetaStorageCommandListener that removes an entry for the given key and retrieves a
- * previous entry for the given key.
- */
-public final class GetAndRemoveCommand implements WriteCommand {
- /** The key. Couldn't be {@code null}. */
- @NotNull private final Key key;
-
- /**
- * @param key The key. Couldn't be {@code null}.
- */
- public GetAndRemoveCommand(@NotNull Key key) {
- this.key = key;
- }
-
- /**
- * @return The key. Couldn't be .
- */
- public @NotNull Key key() {
- return key;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
deleted file mode 100644
index 56168d9..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/GetCommand.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Get command for MetaStorageCommandListener that retrieves an entry for the given key and the revision upper bound, if
- * latter is present.
- */
-public final class GetCommand implements ReadCommand {
- /** Key. */
- @NotNull private final Key key;
-
- /** The upper bound for entry revisions. Must be positive. */
- @Nullable private Long revUpperBound;
-
- /**
- * @param key Key. Couldn't be {@code null}.
- */
- public GetCommand(@NotNull Key key) {
- this.key = key;
- }
-
- /**
- * @param key Key. Couldn't be {@code null}.
- * @param revUpperBound The upper bound for entry revisions. Must be positive.
- */
- public GetCommand(@NotNull Key key, @NotNull Long revUpperBound) {
- this.key = key;
-
- assert revUpperBound > 0;
-
- this.revUpperBound = revUpperBound;
- }
-
- /**
- * @return Key.
- */
- public @NotNull Key key() {
- return key;
- }
-
- /**
- * @return The upper bound for entry revisions, or {@code null} if wasn't specified.
- */
- public @Nullable Long revision() {
- return revUpperBound;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
deleted file mode 100644
index 216fb09..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutAllCommand.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Put all command for MetaStorageCommandListener that inserts or updates entries with given keys and given values.
- */
-public final class PutAllCommand implements WriteCommand {
- /** The map of keys and corresponding values. Couldn't be {@code null} or empty. */
- @NotNull private final Map<Key, byte[]> vals;
-
- /**
- * @param vals he map of keys and corresponding values. Couldn't be {@code null} or empty.
- */
- public PutAllCommand(@NotNull Map<Key, byte[]> vals) {
- assert !vals.isEmpty();
-
- if (vals instanceof Serializable)
- this.vals = vals;
- else
- this.vals = new HashMap<>(vals);
- }
-
- /**
- * @return The map of keys and corresponding values. Couldn't be or empty.
- */
- public @NotNull Map<Key, byte[]> values() {
- return vals;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
deleted file mode 100644
index da30b3a..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/PutCommand.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Put command for MetaStorageCommandListener that inserts or updates an entry with the given key and the given value
- * and retrieves a previous entry for the given key.
- */
-public final class PutCommand implements WriteCommand {
- /** The key. Couldn't be {@code null}. */
- @NotNull private final Key key;
-
- /** The value.Couldn't be {@code null}. */
- @NotNull private final byte[] val;
-
- /**
- * @param key The key. Couldn't be {@code null}.
- * @param val The value.Couldn't be {@code null}.
- */
- public PutCommand(@NotNull Key key, @NotNull byte[] val) {
- this.key = key;
- this.val = val;
- }
-
- /**
- * @return The key. Couldn't be .
- */
- public @NotNull Key key() {
- return key;
- }
-
- /**
- * @return The value.Couldn't be .
- */
- public @NotNull byte[] value() {
- return val;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
deleted file mode 100644
index d046bd5..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RangeCommand.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Range command for MetaStorageCommandListener that retrieves entries for the given key range in lexicographic order.
- * Entries will be filtered out by upper bound of given revision number.
- */
-public final class RangeCommand implements WriteCommand {
-
- /** Start key of range (inclusive). Couldn't be {@code null}. */
- @NotNull private final Key keyFrom;
-
- /** End key of range (exclusive). Could be {@code null}. */
- @Nullable private final Key keyTo;
-
- /** The upper bound for entry revision. {@code -1} means latest revision. */
- @NotNull private final Long revUpperBound;
-
- /**
- * @param keyFrom Start key of range (inclusive).
- * @param keyTo End key of range (exclusive).
- */
- public RangeCommand(@NotNull Key keyFrom, @Nullable Key keyTo) {
- this(keyFrom, keyTo, -1L);
- }
-
- /**
- * @param keyFrom Start key of range (inclusive).
- * @param keyTo End key of range (exclusive).
- * @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision.
- */
- public RangeCommand(
- @NotNull Key keyFrom,
- @Nullable Key keyTo,
- @NotNull Long revUpperBound
- ) {
- this.keyFrom = keyFrom;
- this.keyTo = keyTo;
- this.revUpperBound = revUpperBound;
- }
-
- /**
- * @return Start key of range (inclusive). Couldn't be .
- */
- public @NotNull Key keyFrom() {
- return keyFrom;
- }
-
- /**
- * @return End key of range (exclusive). Could be .
- */
- public @Nullable Key keyTo() {
- return keyTo;
- }
-
- /**
- * @return The upper bound for entry revision. means latest revision.
- */
- public @NotNull Long revUpperBound() {
- return revUpperBound;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
deleted file mode 100644
index 50c8c71..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveAllCommand.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Remove all command for MetaStorageCommandListener that removes entries for given keys.
- */
-public final class RemoveAllCommand implements WriteCommand {
- /** The keys collection. Couldn't be {@code null}. */
- @NotNull private final Collection<Key> keys;
-
- /**
- * @param keys The keys collection. Couldn't be {@code null}.
- */
- public RemoveAllCommand(@NotNull Collection<Key> keys) {
- if (keys instanceof Serializable)
- this.keys = keys;
- else
- this.keys = new ArrayList<>(keys);
- }
-
- /**
- * @return The keys collection. Couldn't be .
- */
- public @NotNull Collection<Key> keys() {
- return keys;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
deleted file mode 100644
index fb04713..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/RemoveCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Remove command for MetaStorageCommandListener that removes an entry for the given key.
- */
-public final class RemoveCommand implements WriteCommand {
- /** The key. Couldn't be {@code null}. */
- @NotNull private final Key key;
-
- /**
- * @param key he key. Couldn't be {@code null}.
- */
- public RemoveCommand(@NotNull Key key) {
- this.key = key;
- }
-
- /**
- * @return The key. Couldn't be .
- */
- public @NotNull Key key() {
- return key;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
deleted file mode 100644
index 2ca3abc..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchExactKeysCommand.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Watch command for MetaStorageCommandListener that subscribes on meta storage updates matching the parameters.
- */
-public final class WatchExactKeysCommand implements WriteCommand {
- /** The keys collection. Couldn't be {@code null}. */
- @NotNull private final Collection<Key> keys;
-
- /** Start revision inclusive. {@code 0} - all revisions. */
- @NotNull private final Long revision;
-
- /**
- * @param keys The keys collection. Couldn't be {@code null}.
- * @param revision Start revision inclusive. {@code 0} - all revisions.
- */
- public WatchExactKeysCommand(@NotNull Collection<Key> keys, @NotNull Long revision) {
- if (keys instanceof Serializable)
- this.keys = keys;
- else
- this.keys = new ArrayList<>(keys);
-
- this.revision = revision;
- }
-
- /**
- * @return The keys collection. Couldn't be .
- */
- public @NotNull Collection<Key> keys() {
- return keys;
- }
-
- /**
- * @return Start revision inclusive. - all revisions.
- */
- public @NotNull Long revision() {
- return revision;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
deleted file mode 100644
index fea2bf4..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/WatchRangeKeysCommand.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command;
-
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * Watch command for MetaStorageCommandListener that subscribes on meta storage updates matching the parameters.
- */
-public final class WatchRangeKeysCommand implements WriteCommand {
- /** Start key of range (inclusive). Couldn't be {@code null}. */
- @Nullable private final Key keyFrom;
-
- /** End key of range (exclusive). Could be {@code null}. */
- @Nullable private final Key keyTo;
-
- /** Start revision inclusive. {@code 0} - all revisions. */
- @NotNull private final Long revision;
-
- /**
- * @param keyFrom Start key of range (inclusive).
- * @param keyTo End key of range (exclusive).
- */
- public WatchRangeKeysCommand(@Nullable Key keyFrom, @Nullable Key keyTo) {
- this(keyFrom, keyTo, 0L);
- }
-
- /**
- * @param keyFrom Start key of range (inclusive).
- * @param keyTo End key of range (exclusive).
- * @param revision Start revision inclusive. {@code 0} - all revisions.
- */
- public WatchRangeKeysCommand(
- @Nullable Key keyFrom,
- @Nullable Key keyTo,
- @NotNull Long revision
- ) {
- this.keyFrom = keyFrom;
- this.keyTo = keyTo;
- this.revision = revision;
- }
-
- /**
- * @return Start key of range (inclusive). Couldn't be .
- */
- public @Nullable Key keyFrom() {
- return keyFrom;
- }
-
- /**
- * @return End key of range (exclusive). Could be .
- */
- public @Nullable Key keyTo() {
- return keyTo;
- }
-
- /**
- * @return Start revision inclusive. {@code 0} - all revisions.
- */
- public @NotNull Long revision() {
- return revision;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java
deleted file mode 100644
index aae9c32..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorCloseCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command.cursor;
-
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Cursor close command for MetaStorageCommandListener that closes cursor with given id.
- */
-public class CursorCloseCommand implements WriteCommand {
- /** Cursor id. */
- @NotNull private final IgniteUuid cursorId;
-
- /**
- * @param cursorId Cursor id.
- */
- public CursorCloseCommand(@NotNull IgniteUuid cursorId) {
- this.cursorId = cursorId;
- }
-
- /**
- * @return Cursor id.
- */
- public @NotNull IgniteUuid cursorId() {
- return cursorId;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java
deleted file mode 100644
index 74c19d7..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorHasNextCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command.cursor;
-
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Cursor hasNext command for MetaStorageCommandListener that checks whether next element is available.
- */
-public class CursorHasNextCommand implements ReadCommand {
- /** Cursor id. */
- @NotNull private final IgniteUuid cursorId;
-
- /**
- * @param cursorId Cursor id.
- */
- public CursorHasNextCommand(@NotNull IgniteUuid cursorId) {
- this.cursorId = cursorId;
- }
-
- /**
- * @return Cursor id.
- */
- public @NotNull IgniteUuid cursorId() {
- return cursorId;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java b/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java
deleted file mode 100644
index 5a2f02c..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/internal/metastorage/common/command/cursor/CursorNextCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.common.command.cursor;
-
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Cursor next command for MetaStorageCommandListener that returns next element and moves cursor.
- */
-public class CursorNextCommand implements WriteCommand {
- /** Cursor id. */
- @NotNull private final IgniteUuid cursorId;
-
- /**
- * @param cursorId Cursor id.
- */
- public CursorNextCommand(@NotNull IgniteUuid cursorId) {
- this.cursorId = cursorId;
- }
-
- /**
- * @return Cursor id.
- */
- public @NotNull IgniteUuid cursorId() {
- return cursorId;
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java
index 7f653cc..e7391ba 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java
@@ -49,11 +49,4 @@ public interface Entry {
* @return Revision.
*/
long revision();
-
- /**
- * Returns an update counter.
- *
- * @return Update counter.
- */
- long updateCounter();
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Key.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Key.java
index 7cba39f..d497021 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Key.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Key.java
@@ -17,7 +17,6 @@
package org.apache.ignite.metastorage.common;
-import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.jetbrains.annotations.NotNull;
@@ -25,7 +24,7 @@ import org.jetbrains.annotations.NotNull;
/**
* A wrapper for meta storage key represented by byte array.
*/
-public final class Key implements Comparable<Key>, Serializable {
+public final class Key implements Comparable<Key> {
/** Byte-wise representation of the key. */
@NotNull
private final byte[] arr;
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorage.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorage.java
deleted file mode 100644
index 6155112..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorage.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.metastorage.common;
-
-import java.util.Collection;
-import java.util.List;
-import org.jetbrains.annotations.NotNull;
-
-// TODO: IGNITE-14389 Tmp, used instead of real KeyValueStorage interface from metastorage-server module.
-/**
- *
- */
-@SuppressWarnings("unused") public interface KeyValueStorage {
- /** */
- long revision();
-
- /** */
- long updateCounter();
-
- /** */
- @NotNull Entry get(byte[] key);
-
- /** */
- @NotNull Entry get(byte[] key, long rev);
-
- /** */
- @NotNull Collection<Entry> getAll(List<byte[]> keys);
-
- /** */
- @NotNull Collection<Entry> getAll(List<byte[]> keys, long revUpperBound);
-
- /** */
- void put(byte[] key, byte[] value);
-
- /** */
- @NotNull Entry getAndPut(byte[] key, byte[] value);
-
- /** */
- void putAll(List<byte[]> keys, List<byte[]> values);
-
- /** */
- @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values);
-
- /** */
- void remove(byte[] key);
-
- /** */
- @NotNull Entry getAndRemove(byte[] key);
-
- /** */
- void removeAll(List<byte[]> key);
-
- /** */
- @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys);
-
- /** */
- boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure);
-
- /** */
- Cursor<Entry> range(byte[] keyFrom, byte[] keyTo);
-
- /** */
- Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound);
-
- /** */
- Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev);
-
- /** */
- Cursor<WatchEvent> watch(byte[] key, long rev);
-
- /** */
- Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev);
-
- /** */
- void compact();
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java
deleted file mode 100644
index 8708731..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/KeyValueStorageImpl.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.metastorage.common;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import org.apache.ignite.internal.metastorage.common.DummyEntry;
-import org.jetbrains.annotations.NotNull;
-
-// TODO: IGNITE-14389 Tmp, should be removed.
-/**
- *
- */
-@SuppressWarnings("ConstantConditions")
-public class KeyValueStorageImpl implements KeyValueStorage {
- /** {@inheritDoc} */
- @Override public long revision() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public long updateCounter() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Entry get(byte[] key) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Entry get(byte[] key, long rev) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Collection<Entry> getAll(List<byte[]> keys) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Collection<Entry> getAll(List<byte[]> keys, long revUpperBound) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void put(byte[] key, byte[] value) {
-
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Entry getAndPut(byte[] key, byte[] value) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void putAll(List<byte[]> keys, List<byte[]> values) {
-
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Collection<Entry> getAndPutAll(List<byte[]> keys, List<byte[]> values) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void remove(byte[] key) {
-
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Entry getAndRemove(byte[] key) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void removeAll(List<byte[]> keys) {
-
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<WatchEvent> watch(byte[] keyFrom, byte[] keyTo, long rev) {
- return new Cursor<>() {
- /** {@inheritDoc} */
- @Override public void close(){
-
- }
-
- /** {@inheritDoc} */
- @NotNull @Override public Iterator<WatchEvent> iterator() {
- return new Iterator<>() {
- @Override public boolean hasNext() {
- return true;
- }
-
- @Override public WatchEvent next() {
- return new WatchEvent(
- new DummyEntry(
- new Key(new byte[] {1}),
- new byte[] {2},
- 1L,
- 1L
- ),
- new DummyEntry(
- new Key(new byte[] {1}),
- new byte[] {3},
- 2L,
- 2L
- )
- );
- }
- };
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<WatchEvent> watch(byte[] key, long rev) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Cursor<WatchEvent> watch(Collection<byte[]> keys, long rev) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void compact() {
-
- }
-}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/WatchEvent.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/WatchEvent.java
index 119167a..bd78ea5 100644
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/WatchEvent.java
+++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/WatchEvent.java
@@ -17,18 +17,15 @@
package org.apache.ignite.metastorage.common;
-import java.io.Serializable;
-import org.jetbrains.annotations.NotNull;
-
/**
* Watch event which can be processed by {@link WatchListener}.
*/
-public final class WatchEvent implements Serializable {
+public final class WatchEvent {
/** Old (previous) entry */
- @NotNull private final Entry oldEntry;
+ private final Entry oldEntry;
/** New (updated) entry. */
- @NotNull private final Entry newEntry;
+ private final Entry newEntry;
/**
* Constructs an event with given old and new entries.
@@ -36,7 +33,7 @@ public final class WatchEvent implements Serializable {
* @param oldEntry Old entry.
* @param newEntry New entry/
*/
- public WatchEvent(@NotNull Entry oldEntry, @NotNull Entry newEntry) {
+ public WatchEvent(Entry oldEntry, Entry newEntry) {
this.oldEntry = oldEntry;
this.newEntry = newEntry;
}
@@ -46,7 +43,7 @@ public final class WatchEvent implements Serializable {
*
* @return Old entry.
*/
- public @NotNull Entry oldEntry() {
+ public Entry oldEntry() {
return oldEntry;
}
@@ -55,28 +52,7 @@ public final class WatchEvent implements Serializable {
*
* @return New entry.
*/
- public @NotNull Entry newEntry() {
+ public Entry newEntry() {
return newEntry;
}
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
-
- WatchEvent that = (WatchEvent)o;
-
- if (!oldEntry.equals(that.oldEntry))
- return false;
- return newEntry.equals(that.newEntry);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = oldEntry.hashCode();
- res = 31 * res + newEntry.hashCode();
- return res;
- }
}
diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/raft/MetaStorageCommandListener.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/raft/MetaStorageCommandListener.java
deleted file mode 100644
index 932ab59..0000000
--- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/raft/MetaStorageCommandListener.java
+++ /dev/null
@@ -1,297 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.metastorage.common.raft;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Collectors;
-import org.apache.ignite.internal.metastorage.common.command.GetAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetAndPutAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetAndPutCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetAndRemoveCommand;
-import org.apache.ignite.internal.metastorage.common.command.GetCommand;
-import org.apache.ignite.internal.metastorage.common.command.PutAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.PutCommand;
-import org.apache.ignite.internal.metastorage.common.command.RangeCommand;
-import org.apache.ignite.internal.metastorage.common.command.RemoveAllCommand;
-import org.apache.ignite.internal.metastorage.common.command.RemoveCommand;
-import org.apache.ignite.internal.metastorage.common.command.WatchExactKeysCommand;
-import org.apache.ignite.internal.metastorage.common.command.WatchRangeKeysCommand;
-import org.apache.ignite.internal.metastorage.common.command.cursor.CursorCloseCommand;
-import org.apache.ignite.internal.metastorage.common.command.cursor.CursorHasNextCommand;
-import org.apache.ignite.internal.metastorage.common.command.cursor.CursorNextCommand;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.metastorage.common.CompactedException;
-import org.apache.ignite.metastorage.common.Cursor;
-import org.apache.ignite.metastorage.common.Entry;
-import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.metastorage.common.KeyValueStorage;
-import org.apache.ignite.metastorage.common.OperationTimeoutException;
-import org.apache.ignite.metastorage.common.WatchEvent;
-import org.apache.ignite.raft.client.ReadCommand;
-import org.apache.ignite.raft.client.WriteCommand;
-import org.apache.ignite.raft.client.service.CommandClosure;
-import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Meta storage command listener aka mata storage raft state machine.
- */
-public class MetaStorageCommandListener implements RaftGroupCommandListener {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(MetaStorageCommandListener.class);
-
- /** Storage. */
- private final KeyValueStorage storage;
-
- /** Cursors map. */
- private final Map<IgniteUuid, IgniteBiTuple<@NotNull Cursor, @NotNull Iterator>> cursors;
-
- /**
- * @param storage Storage.
- */
- public MetaStorageCommandListener(KeyValueStorage storage) {
- this.storage = storage;
- this.cursors = new ConcurrentHashMap<>();
- }
-
- /** {@inheritDoc} */
- @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iter) {
- while (iter.hasNext()) {
- CommandClosure<ReadCommand> clo = iter.next();
-
- try {
- if (clo.command() instanceof GetCommand) {
- GetCommand getCmd = (GetCommand)clo.command();
-
- if (getCmd.revision() != null)
- clo.success(storage.get(getCmd.key().bytes(), getCmd.revision()));
- else
- clo.success(storage.get(getCmd.key().bytes()));
- }
- else if (clo.command() instanceof GetAllCommand) {
- GetAllCommand getAllCmd = (GetAllCommand)clo.command();
-
- if (getAllCmd.revision() != null) {
- clo.success(storage.getAll(
- getAllCmd.keys().stream().map(Key::bytes).collect(Collectors.toList()),
- getAllCmd.revision())
- );
- }
- else {
- clo.success(storage.getAll(
- getAllCmd.keys().stream().map(Key::bytes).collect(Collectors.toList()))
- );
- }
- }
- else if (clo.command() instanceof CursorHasNextCommand) {
- CursorHasNextCommand cursorHasNextCmd = (CursorHasNextCommand)clo.command();
-
- assert cursors.containsKey(cursorHasNextCmd.cursorId());
-
- clo.success(cursors.get(cursorHasNextCmd.cursorId()).getValue().hasNext());
- }
- else
- assert false : "Command was not found [cmd=" + clo.command() + ']';
- }
- catch (CompactedException | OperationTimeoutException e) {
- // TODO: IGNITE-14693 Implement MetaStorage exception handling logic.
- LOG.warn("Unable to evaluate command [cmd=" + clo.command() + ']', e);
-
- clo.failure(e);
- }
- catch (Throwable e) {
- LOG.error("Unable to evaluate command [cmd=" + clo.command() + ']', e);
-
- clo.failure(e);
- }
- }
- }
-
- /** {@inheritDoc} */
- @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iter) {
- while (iter.hasNext()) {
- CommandClosure<WriteCommand> clo = iter.next();
-
- try {
- if (clo.command() instanceof PutCommand) {
- PutCommand putCmd = (PutCommand)clo.command();
-
- storage.put(putCmd.key().bytes(), putCmd.value());
-
- clo.success(null);
- }
- else if (clo.command() instanceof GetAndPutCommand) {
- GetAndPutCommand getAndPutCmd = (GetAndPutCommand)clo.command();
-
- clo.success(storage.getAndPut(getAndPutCmd.key().bytes(), getAndPutCmd.value()));
- }
- else if (clo.command() instanceof PutAllCommand) {
- PutAllCommand putAllCmd = (PutAllCommand)clo.command();
-
- storage.putAll(
- putAllCmd.values().keySet().stream().map(Key::bytes).collect(Collectors.toList()),
- new ArrayList<>(putAllCmd.values().values()));
-
- clo.success(null);
- }
- else if (clo.command() instanceof GetAndPutAllCommand) {
- GetAndPutAllCommand getAndPutAllCmd = (GetAndPutAllCommand)clo.command();
-
- Collection<Entry> entries = storage.getAndPutAll(
- getAndPutAllCmd.keys().stream().map(Key::bytes).collect(Collectors.toList()),
- getAndPutAllCmd.vals()
- );
-
- if (!(entries instanceof Serializable))
- entries = new ArrayList<>(entries);
-
- clo.success(entries);
- }
- else if (clo.command() instanceof RemoveCommand) {
- RemoveCommand rmvCmd = (RemoveCommand)clo.command();
-
- storage.remove(rmvCmd.key().bytes());
-
- clo.success(null);
- }
- else if (clo.command() instanceof GetAndRemoveCommand) {
- GetAndRemoveCommand getAndRmvCmd = (GetAndRemoveCommand)clo.command();
-
- clo.success(storage.getAndRemove(getAndRmvCmd.key().bytes()));
- }
- else if (clo.command() instanceof RemoveAllCommand) {
- RemoveAllCommand rmvAllCmd = (RemoveAllCommand)clo.command();
-
- storage.removeAll(rmvAllCmd.keys().stream().map(Key::bytes).collect(Collectors.toList()));
-
- clo.success(null);
- }
- else if (clo.command() instanceof GetAndRemoveAllCommand) {
- GetAndRemoveAllCommand getAndRmvAllCmd = (GetAndRemoveAllCommand)clo.command();
-
- Collection<Entry> entries = storage.getAndRemoveAll(
- getAndRmvAllCmd.keys().stream().map(Key::bytes).collect(Collectors.toList())
- );
-
- if (!(entries instanceof Serializable))
- entries = new ArrayList<>(entries);
-
- clo.success(entries);
- }
- else if (clo.command() instanceof RangeCommand) {
- RangeCommand rangeCmd = (RangeCommand)clo.command();
-
- IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
-
- Cursor<Entry> cursor = storage.range(
- rangeCmd.keyFrom().bytes(),
- rangeCmd.keyTo() == null ? null : rangeCmd.keyTo().bytes(),
- rangeCmd.revUpperBound()
- );
-
- cursors.put(
- cursorId,
- new IgniteBiTuple<>(cursor, cursor.iterator())
- );
-
- clo.success(cursorId);
- }
- else if (clo.command() instanceof CursorNextCommand) {
- CursorNextCommand cursorNextCmd = (CursorNextCommand)clo.command();
-
- assert cursors.containsKey(cursorNextCmd.cursorId());
-
- clo.success(cursors.get(cursorNextCmd.cursorId()).getValue().next());
- }
- else if (clo.command() instanceof CursorCloseCommand) {
- CursorCloseCommand cursorCloseCmd = (CursorCloseCommand)clo.command();
-
- cursors.computeIfPresent(cursorCloseCmd.cursorId(), (k, v) -> {
- try {
- v.getKey().close();
- }
- catch (Exception e) {
- LOG.error("Unable to close cursor during command evaluation " +
- "[cmd=" + clo.command() + ", cursor=" + cursorCloseCmd.cursorId() + ']', e);
-
- clo.failure(e);
- }
- return null;
- });
-
- clo.success(null);
- }
- else if (clo.command() instanceof WatchRangeKeysCommand) {
- WatchRangeKeysCommand watchCmd = (WatchRangeKeysCommand)clo.command();
-
- IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
-
- Cursor<WatchEvent> cursor = storage.watch(
- watchCmd.keyFrom() == null ? null : watchCmd.keyFrom().bytes(),
- watchCmd.keyTo() == null ? null : watchCmd.keyTo().bytes(),
- watchCmd.revision());
-
- cursors.put(
- cursorId,
- new IgniteBiTuple<>(cursor, cursor.iterator())
- );
-
- clo.success(cursorId);
- }
- else if (clo.command() instanceof WatchExactKeysCommand) {
- WatchExactKeysCommand watchCmd = (WatchExactKeysCommand)clo.command();
-
- IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
-
- Cursor<WatchEvent> cursor = storage.watch(
- watchCmd.keys().stream().map(Key::bytes).collect(Collectors.toList()),
- watchCmd.revision());
-
- cursors.put(
- cursorId,
- new IgniteBiTuple<>(cursor, cursor.iterator())
- );
-
- clo.success(cursorId);
- }
- else
- assert false : "Command was not found [cmd=" + clo.command() + ']';
- }
- catch (CompactedException | OperationTimeoutException e) {
- // TODO: IGNITE-14693 Implement MetaStorage exception handling logic.
- LOG.warn("Unable to evaluate command [cmd=" + clo.command() + ']', e);
-
- clo.failure(e);
- }
- catch (Throwable e) {
- LOG.error("Unable to evaluate command [cmd=" + clo.command() + ']', e);
-
- clo.failure(e);
- }
- }
- }
-}
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index e59aa31..669b937 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -17,19 +17,12 @@
package org.apache.ignite.internal.metastorage;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.function.Predicate;
import java.util.stream.Collectors;
-import org.apache.ignite.configuration.internal.ConfigurationManager;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
-import org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl;
import org.apache.ignite.internal.metastorage.watch.WatchAggregator;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.vault.VaultManager;
@@ -44,12 +37,9 @@ import org.apache.ignite.metastorage.common.Condition;
import org.apache.ignite.metastorage.common.Cursor;
import org.apache.ignite.metastorage.common.Entry;
import org.apache.ignite.metastorage.common.Key;
-import org.apache.ignite.metastorage.common.KeyValueStorageImpl;
import org.apache.ignite.metastorage.common.Operation;
import org.apache.ignite.metastorage.common.OperationTimeoutException;
import org.apache.ignite.metastorage.common.WatchListener;
-import org.apache.ignite.metastorage.common.raft.MetaStorageCommandListener;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -63,10 +53,7 @@ import org.jetbrains.annotations.Nullable;
* </ul>
*/
// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings("unused") public class MetaStorageManager {
- /** MetaStorage raft group name. */
- private static final String METASTORAGE_RAFT_GROUP_NAME = "metastorage_raft_group";
-
+@SuppressWarnings({"FieldCanBeLocal", "unused", "WeakerAccess"}) public class MetaStorageManager {
/** Vault manager in order to commit processed watches with corresponding applied revision. */
private final VaultManager vaultMgr;
@@ -77,7 +64,7 @@ import org.jetbrains.annotations.Nullable;
private final Loza raftMgr;
/** Meta storage service. */
- private CompletableFuture<MetaStorageService> metaStorageSvcFut;
+ private MetaStorageService metaStorageSvc;
/**
* Aggregator of multiple watches to deploy them as one batch.
@@ -110,51 +97,17 @@ import org.jetbrains.annotations.Nullable;
*/
public MetaStorageManager(
VaultManager vaultMgr,
- ConfigurationManager locCfgMgr,
ClusterService clusterNetSvc,
- Loza raftMgr
+ Loza raftMgr,
+ MetaStorageService metaStorageSvc
) {
this.vaultMgr = vaultMgr;
this.clusterNetSvc = clusterNetSvc;
this.raftMgr = raftMgr;
+ this.metaStorageSvc = metaStorageSvc;
watchAggregator = new WatchAggregator();
deployFut = new CompletableFuture<>();
- String locNodeName = locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
- .name().value();
-
- String[] metastorageNodes = locCfgMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
- .metastorageNodes().value();
-
- Predicate<ClusterNode> metaStorageNodesContainsLocPred =
- clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
-
- if (hasMetastorageLocally(locNodeName, metastorageNodes)) {
- this.metaStorageSvcFut = CompletableFuture.completedFuture(new MetaStorageServiceImpl(
- raftMgr.startRaftGroup(
- METASTORAGE_RAFT_GROUP_NAME,
- clusterNetSvc.topologyService().allMembers().stream().filter(
- metaStorageNodesContainsLocPred).
- collect(Collectors.toList()),
- new MetaStorageCommandListener(new KeyValueStorageImpl())
- )
- )
- );
- }
- else if (metastorageNodes.length > 0) {
- this.metaStorageSvcFut = CompletableFuture.completedFuture(new MetaStorageServiceImpl(
- raftMgr.startRaftService(
- METASTORAGE_RAFT_GROUP_NAME,
- clusterNetSvc.topologyService().allMembers().stream().filter(
- metaStorageNodesContainsLocPred).
- collect(Collectors.toList())
- )
- )
- );
- }
- else
- this.metaStorageSvcFut = new CompletableFuture<>();
-
// TODO: IGNITE-14088: Uncomment and use real serializer factory
// Arrays.stream(MetaStorageMessageTypes.values()).forEach(
// msgTypeInstance -> net.registerMessageMapper(
@@ -163,8 +116,9 @@ import org.jetbrains.annotations.Nullable;
// )
// );
- // TODO: IGNITE-14414 Cluster initialization flow. Here we should complete metaStorageServiceFuture.
- clusterNetSvc.messagingService().addMessageHandler((message, sender, correlationId) -> {});
+ clusterNetSvc.messagingService().addMessageHandler((message, sender, correlationId) -> {
+ // TODO: IGNITE-14414 Cluster initialization flow.
+ });
}
/**
@@ -179,11 +133,11 @@ import org.jetbrains.annotations.Nullable;
if (watch.isEmpty())
deployFut.complete(Optional.empty());
else
- metaStorageSvcFut.thenApply(svc -> svc.watch(
+ metaStorageSvc.watch(
watch.get().keyCriterion().toRange().getKey(),
watch.get().keyCriterion().toRange().getValue(),
watch.get().revision(),
- watch.get().listener()).thenAccept(id -> deployFut.complete(Optional.of(id))).join());
+ watch.get().listener()).thenAccept(id -> deployFut.complete(Optional.of(id))).join();
}
catch (IgniteInternalCheckedException e) {
throw new IgniteInternalException("Couldn't receive applied revision during deploy watches", e);
@@ -271,84 +225,84 @@ import org.jetbrains.annotations.Nullable;
* @see MetaStorageService#get(Key)
*/
public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
- return metaStorageSvcFut.thenCompose(svc -> svc.get(key));
+ return metaStorageSvc.get(key);
}
/**
* @see MetaStorageService#get(Key, long)
*/
public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
- return metaStorageSvcFut.thenCompose(svc -> svc.get(key, revUpperBound));
+ return metaStorageSvc.get(key, revUpperBound);
}
/**
* @see MetaStorageService#getAll(Collection)
*/
public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys));
+ return metaStorageSvc.getAll(keys);
}
/**
* @see MetaStorageService#getAll(Collection, long)
*/
public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAll(keys, revUpperBound));
+ return metaStorageSvc.getAll(keys, revUpperBound);
}
/**
* @see MetaStorageService#put(Key, byte[])
*/
public @NotNull CompletableFuture<Void> put(@NotNull Key key, byte[] val) {
- return metaStorageSvcFut.thenCompose(svc -> svc.put(key, val));
+ return metaStorageSvc.put(key, val);
}
/**
* @see MetaStorageService#getAndPut(Key, byte[])
*/
public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, byte[] val) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndPut(key, val));
+ return metaStorageSvc.getAndPut(key, val);
}
/**
* @see MetaStorageService#putAll(Map)
*/
public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
- return metaStorageSvcFut.thenCompose(svc -> svc.putAll(vals));
+ return metaStorageSvc.putAll(vals);
}
/**
* @see MetaStorageService#getAndPutAll(Map)
*/
public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndPutAll(vals));
+ return metaStorageSvc.getAndPutAll(vals);
}
/**
* @see MetaStorageService#remove(Key)
*/
public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
- return metaStorageSvcFut.thenCompose(svc -> svc.remove(key));
+ return metaStorageSvc.remove(key);
}
/**
* @see MetaStorageService#getAndRemove(Key)
*/
public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemove(key));
+ return metaStorageSvc.getAndRemove(key);
}
/**
* @see MetaStorageService#removeAll(Collection)
*/
public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
- return metaStorageSvcFut.thenCompose(svc -> svc.removeAll(keys));
+ return metaStorageSvc.removeAll(keys);
}
/**
* @see MetaStorageService#getAndRemoveAll(Collection)
*/
public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndRemoveAll(keys));
+ return metaStorageSvc.getAndRemoveAll(keys);
}
/**
@@ -361,7 +315,7 @@ import org.jetbrains.annotations.Nullable;
@NotNull Operation success,
@NotNull Operation failure
) {
- return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, Collections.singletonList(success), Collections.singletonList(failure)));
+ return metaStorageSvc.invoke(cond, Collections.singletonList(success), Collections.singletonList(failure));
}
/**
@@ -372,7 +326,7 @@ import org.jetbrains.annotations.Nullable;
@NotNull Collection<Operation> success,
@NotNull Collection<Operation> failure
) {
- return metaStorageSvcFut.thenCompose(svc -> svc.invoke(cond, success, failure));
+ return metaStorageSvc.invoke(cond, success, failure);
}
/**
@@ -384,17 +338,14 @@ import org.jetbrains.annotations.Nullable;
@NotNull Operation success,
@NotNull Operation failure
) {
- return metaStorageSvcFut.thenCompose(svc -> svc.getAndInvoke(key, cond, success, failure));
+ return metaStorageSvc.getAndInvoke(key, cond, success, failure);
}
/**
* @see MetaStorageService#range(Key, Key, long)
*/
public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
- return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo, revUpperBound))
- );
+ return metaStorageSvc.range(keyFrom, keyTo, revUpperBound);
}
/**
@@ -411,34 +362,26 @@ import org.jetbrains.annotations.Nullable;
* @see Entry
*/
public @NotNull Cursor<Entry> rangeWithAppliedRevision(@NotNull Key keyFrom, @Nullable Key keyTo) {
- return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> {
- try {
- return svc.range(keyFrom, keyTo, vaultMgr.appliedRevision());
- }
- catch (IgniteInternalCheckedException e) {
- throw new IgniteInternalException(e);
- }
- })
- );
+ try {
+ return metaStorageSvc.range(keyFrom, keyTo, vaultMgr.appliedRevision());
+ }
+ catch (IgniteInternalCheckedException e) {
+ throw new IgniteInternalException(e);
+ }
}
/**
* @see MetaStorageService#range(Key, Key)
*/
public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
- return new CursorWrapper<>(
- metaStorageSvcFut,
- metaStorageSvcFut.thenApply(svc -> svc.range(keyFrom, keyTo))
- );
+ return metaStorageSvc.range(keyFrom, keyTo);
}
/**
* @see MetaStorageService#compact()
*/
public @NotNull CompletableFuture<Void> compact() {
- return metaStorageSvcFut.thenCompose(MetaStorageService::compact);
+ return metaStorageSvc.compact();
}
/**
@@ -458,20 +401,19 @@ import org.jetbrains.annotations.Nullable;
final var finalRevision = revision;
deployFut = deployFut
- .thenCompose(idOpt -> idOpt.map(id -> metaStorageSvcFut.thenCompose(svc -> svc.stopWatch(id))).
- orElse(CompletableFuture.completedFuture(null))
+ .thenCompose(idOpt -> idOpt.map(metaStorageSvc::stopWatch).orElse(CompletableFuture.completedFuture(null)))
.thenCompose(r -> {
var watch = watchAggregator.watch(finalRevision, this::storeEntries);
if (watch.isEmpty())
return CompletableFuture.completedFuture(Optional.empty());
else
- return metaStorageSvcFut.thenCompose(svc -> svc.watch(
+ return metaStorageSvc.watch(
watch.get().keyCriterion().toRange().get1(),
watch.get().keyCriterion().toRange().get2(),
watch.get().revision(),
- watch.get().listener()).thenApply(Optional::of));
- }));
+ watch.get().listener()).thenApply(Optional::of);
+ });
return deployFut;
}
@@ -506,89 +448,4 @@ import org.jetbrains.annotations.Nullable;
else
return deployFut.thenApply(uid -> id);
}
-
- /**
- * Checks whether the local node hosts Metastorage.
- *
- * @param locNodeName Local node uniq name.
- * @param metastorageMembers Metastorage members names.
- * @return True if the node has Metastorage, false otherwise.
- */
- public static boolean hasMetastorageLocally(String locNodeName, String[] metastorageMembers) {
- boolean isLocNodeHasMetasorage = false;
-
- for (String name : metastorageMembers) {
- if (name.equals(locNodeName)) {
- isLocNodeHasMetasorage = true;
-
- break;
- }
- }
- return isLocNodeHasMetasorage;
- }
-
- // TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
- /** Cursor wrapper. */
- private final class CursorWrapper<T> implements Cursor<T> {
- /** MetaStorage service future. */
- private final CompletableFuture<MetaStorageService> metaStorageSvcFut;
-
- /** Inner cursor future. */
- private final CompletableFuture<Cursor<T>> innerCursorFut;
-
- /** Inner iterator future. */
- private final CompletableFuture<Iterator<T>> innerIterFut;
-
- /**
- * @param metaStorageSvcFut MetaStorage service future.
- * @param innerCursorFut Inner cursor future.
- */
- CursorWrapper(
- CompletableFuture<MetaStorageService> metaStorageSvcFut,
- CompletableFuture<Cursor<T>> innerCursorFut
- ) {
- this.metaStorageSvcFut = metaStorageSvcFut;
- this.innerCursorFut = innerCursorFut;
- this.innerIterFut = innerCursorFut.thenApply(Iterable::iterator);
- }
-
- /** {@inheritDoc} */
- @Override public void close() throws Exception {
- innerCursorFut.thenCompose(cursor -> {
- try {
- cursor.close();
-
- return null;
- }
- catch (Exception e) {
- throw new IgniteInternalException(e);
- }
- }).get();
- }
-
- /** {@inheritDoc} */
- @NotNull @Override public Iterator<T> iterator() {
- return new Iterator<>() {
- /** {@inheritDoc} */
- @Override public boolean hasNext() {
- try {
- return innerIterFut.thenApply(Iterator::hasNext).get();
- }
- catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public T next() {
- try {
- return innerIterFut.thenApply(Iterator::next).get();
- }
- catch (InterruptedException | ExecutionException e) {
- throw new IgniteInternalException(e);
- }
- }
- };
- }
- }
}
diff --git a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
index 1360668..4eb1901 100644
--- a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
+++ b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/WatchAggregatorTest.java
@@ -46,11 +46,11 @@ public class WatchAggregatorTest {
watchAggregator.add(new Key("2"), lsnr2);
var watchEvent1 = new WatchEvent(
- entry("1", "value1", 1, 1),
- entry("1", "value1n", 1, 1));
+ entry("1", "value1", 1),
+ entry("1", "value1n", 1));
var watchEvent2 = new WatchEvent(
- entry("2", "value2", 1, 1),
- entry("2", "value2n", 1, 1));
+ entry("2", "value2", 1),
+ entry("2", "value2n", 1));
watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
verify(lsnr1).onUpdate(Collections.singletonList(watchEvent1));
@@ -68,11 +68,11 @@ public class WatchAggregatorTest {
var id2 = watchAggregator.add(new Key("2"), lsnr2);
var watchEvent1 = new WatchEvent(
- entry("1", "value1", 1, 1),
- entry("1", "value1n", 1, 1));
+ entry("1", "value1", 1),
+ entry("1", "value1n", 1));
var watchEvent2 = new WatchEvent(
- entry("2", "value2", 1, 1),
- entry("2", "value2n", 1, 1));
+ entry("2", "value2", 1),
+ entry("2", "value2n", 1));
watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
verify(lsnr1, times(1)).onUpdate(any());
@@ -96,11 +96,11 @@ public class WatchAggregatorTest {
var id2 = watchAggregator.add(new Key("2"), lsnr2);
var watchEvent1 = new WatchEvent(
- entry("1", "value1", 1, 1),
- entry("1", "value1n", 1, 1));
+ entry("1", "value1", 1),
+ entry("1", "value1n", 1));
var watchEvent2 = new WatchEvent(
- entry("2", "value2", 1, 1),
- entry("2", "value2n", 1, 1));
+ entry("2", "value2", 1),
+ entry("2", "value2n", 1));
watchAggregator.watch(1, (v1, v2) -> {}).get().listener().onUpdate(Arrays.asList(watchEvent1, watchEvent2));
verify(lsnr1, times(1)).onUpdate(any());
@@ -113,7 +113,7 @@ public class WatchAggregatorTest {
}
- private Entry entry(String key, String value, long revision, long updateCntr) {
+ private Entry entry(String key, String value, long revision) {
return new Entry() {
@Override public @NotNull Key key() {
return new Key(key);
@@ -126,10 +126,6 @@ public class WatchAggregatorTest {
@Override public long revision() {
return revision;
}
-
- @Override public long updateCounter() {
- return updateCntr;
- }
};
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 6dc77aa..3444091 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -102,16 +102,4 @@ public class Loza {
if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name()))
raftServer.clearListener(groupId);
}
-
- public RaftGroupService startRaftService(String groupId, List<ClusterNode> peers) {
- return new RaftGroupServiceImpl(
- groupId,
- clusterNetSvc,
- FACTORY,
- TIMEOUT,
- peers.stream().map(Peer::new).collect(Collectors.toList()),
- true,
- DELAY
- );
- }
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
index c6a1359..166e5c5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
@@ -35,7 +35,7 @@ class IgnitionTest {
"{\n" +
" \"node\": {\n" +
" \"name\":node0,\n" +
- " \"metastorageNodes\":[ \"node0\" ]\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
" },\n" +
" \"network\": {\n" +
" \"port\":3344,\n" +
@@ -46,7 +46,7 @@ class IgnitionTest {
"{\n" +
" \"node\": {\n" +
" \"name\":node1,\n" +
- " \"metastorageNodes\":[ \"node0\" ]\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
" },\n" +
" \"network\": {\n" +
" \"port\":3345,\n" +
@@ -57,7 +57,7 @@ class IgnitionTest {
"{\n" +
" \"node\": {\n" +
" \"name\":node2,\n" +
- " \"metastorageNodes\":[ \"node0\" ]\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
" },\n" +
" \"network\": {\n" +
" \"port\":3346,\n" +
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index 513a3d0..4e608eb 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -20,8 +20,12 @@ package org.apache.ignite.internal.app;
import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
@@ -45,12 +49,22 @@ import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.internal.vault.impl.VaultServiceImpl;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.metastorage.client.MetaStorageService;
+import org.apache.ignite.metastorage.common.Condition;
+import org.apache.ignite.metastorage.common.Cursor;
+import org.apache.ignite.metastorage.common.Entry;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
+import org.apache.ignite.metastorage.common.WatchListener;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.message.MessageSerializationRegistry;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.utils.IgniteProperties;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Implementation of an entry point for handling grid lifecycle.
@@ -141,9 +155,9 @@ public class IgnitionImpl implements Ignition {
// MetaStorage Component startup.
MetaStorageManager metaStorageMgr = new MetaStorageManager(
vaultMgr,
- locConfigurationMgr,
clusterNetSvc,
- raftMgr
+ raftMgr,
+ metaStorageServiceMock()
);
// TODO IGNITE-14578 Bootstrap configuration manager with distributed configuration.
@@ -194,4 +208,99 @@ public class IgnitionImpl implements Ignition {
LOG.info(banner + '\n' + " ".repeat(22) + "Apache Ignite ver. " + ver + '\n');
}
+
+ // TODO: remove when metastorage service will be ready.
+ private static MetaStorageService metaStorageServiceMock() {
+ return new MetaStorageService() {
+ @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, @NotNull byte[] value) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> remove(@NotNull Key key) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override
+ public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition,
+ @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition,
+ @NotNull Operation success, @NotNull Operation failure) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<IgniteUuid> watch(@Nullable Key keyFrom, @Nullable Key keyTo,
+ long revision, @NotNull WatchListener lsnr) {
+ return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+ }
+
+ @Override public @NotNull CompletableFuture<IgniteUuid> watch(@NotNull Key key, long revision,
+ @NotNull WatchListener lsnr) {
+ return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+ }
+
+ @Override public @NotNull CompletableFuture<IgniteUuid> watch(@NotNull Collection<Key> keys, long revision,
+ @NotNull WatchListener lsnr) {
+ return CompletableFuture.completedFuture(new IgniteUuid(UUID.randomUUID(), 0L));
+ }
+
+ @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+
+ @Override public @NotNull CompletableFuture<Void> compact() {
+ throw new UnsupportedOperationException("Metastorage service is not implemented yet");
+ }
+ };
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4d47435..012d57b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -106,7 +106,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
configurationMgr.configurationRegistry().getConfiguration(ClusterConfiguration.KEY)
.metastorageNodes().listen(ctx -> {
if (ctx.newValue() != null) {
- if (MetaStorageManager.hasMetastorageLocally(localNodeName, ctx.newValue()))
+ if (hasMetastorageLocally(localNodeName, ctx.newValue()))
subscribeForTableCreation();
else
unsubscribeForTableCreation();
@@ -118,7 +118,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
String[] metastorageMembers = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
.metastorageNodes().value();
- if (MetaStorageManager.hasMetastorageLocally(localNodeName, metastorageMembers))
+ if (hasMetastorageLocally(localNodeName, metastorageMembers))
subscribeForTableCreation();
String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
@@ -201,6 +201,26 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
/**
+ * Checks whether the local node hosts Metastorage.
+ *
+ * @param localNodeName Local node uniq name.
+ * @param metastorageMembers Metastorage members names.
+ * @return True if the node has Metastorage, false otherwise.
+ */
+ private boolean hasMetastorageLocally(String localNodeName, String[] metastorageMembers) {
+ boolean isLocalNodeHasMetasorage = false;
+
+ for (String name : metastorageMembers) {
+ if (name.equals(localNodeName)) {
+ isLocalNodeHasMetasorage = true;
+
+ break;
+ }
+ }
+ return isLocalNodeHasMetasorage;
+ }
+
+ /**
* Subscribes on table create.
*/
private void subscribeForTableCreation() {
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 4953c15..c7dcfa3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -72,7 +72,7 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
- .thenApply(KVGetResponse::getValue);
+ .thenApply(response -> response.getValue());
}
/** {@inheritDoc} */
@@ -82,7 +82,8 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> upsert(BinaryRow row) {
- return partitionMap.get(row.hash() % partitions).run(new UpsertCommand(row));
+ return partitionMap.get(row.hash() % partitions).<Void>run(new UpsertCommand(row))
+ .thenApply(response -> response);
}
/** {@inheritDoc} */
@@ -97,7 +98,8 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> insert(BinaryRow row) {
- return partitionMap.get(row.hash() % partitions).run(new InsertCommand(row));
+ return partitionMap.get(row.hash() % partitions).<Boolean>run(new InsertCommand(row))
+ .thenApply(response -> response);
}
/** {@inheritDoc} */
@@ -112,7 +114,8 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> replace(BinaryRow oldRow, BinaryRow newRow) {
- return partitionMap.get(oldRow.hash() % partitions).run(new ReplaceCommand(oldRow, newRow));
+ return partitionMap.get(oldRow.hash() % partitions).<Boolean>run(new ReplaceCommand(oldRow, newRow))
+ .thenApply(response -> response);
}
/** {@inheritDoc} */
@@ -122,7 +125,8 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> delete(BinaryRow keyRow) {
- return partitionMap.get(keyRow.hash() % partitions).run(new DeleteCommand(keyRow));
+ return partitionMap.get(keyRow.hash() % partitions).<Boolean>run(new DeleteCommand(keyRow))
+ .thenApply(response -> response);
}
/** {@inheritDoc} */