You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by as...@apache.org on 2021/04/27 07:51:35 UTC
[ignite-3] branch main updated: IGNITE-14235 Basic table management
- Fixes #103.
This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0de7681 IGNITE-14235 Basic table management - Fixes #103.
0de7681 is described below
commit 0de76810c8a610ec421b774f13152327efe8ea0d
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue Apr 27 10:45:34 2021 +0300
IGNITE-14235 Basic table management - Fixes #103.
Signed-off-by: Alexey Scherbakov <al...@gmail.com>
---
modules/affinity/pom.xml | 10 +
.../ignite/internal/affinity/AffinityManager.java | 163 +++++++-
.../affinity/RendezvousAffinityFunction.java | 410 +++++++++++++++++++++
.../affinity/RendezvousAffinityFunctionTest.java | 206 +++++++++++
.../main/java/org/apache/ignite/app/Ignite.java | 8 +-
.../network/NetworkConfigurationSchema.java | 8 +-
.../schemas/rest/RestConfigurationSchema.java | 5 +-
.../schemas/runner/ClusterConfigurationSchema.java | 2 +-
.../NodeConfigurationSchema.java} | 23 +-
.../schemas/table/TableConfigurationSchema.java} | 29 +-
.../schemas/table/TablesConfigurationSchema.java} | 21 +-
.../{TableManager.java => IgniteTables.java} | 34 +-
.../ignite/internal/baseline/BaselineManager.java | 11 +
.../configuration/ConfigurationRegistry.java | 2 +-
.../internal/tostring/IgniteToStringBuilder.java | 1 -
.../apache/ignite/internal/util/ArrayUtils.java | 5 +
.../apache/ignite/internal/util/IgniteUtils.java | 51 +++
.../java/org/apache/ignite/lang/IgniteBiTuple.java | 316 ++++++++++++++++
modules/metastorage/pom.xml | 5 +
.../internal/metastorage/MetaStorageManager.java | 48 ++-
.../network/MetaStorageMessageTypes.java | 47 ---
.../org/apache/ignite/network/ClusterNode.java | 10 +
.../client/service/impl/RaftGroupServiceImpl.java | 4 +-
.../raft/client/service/RaftGroupServiceTest.java | 4 +-
.../raft/server/ITRaftCounterServerTest.java | 52 ++-
.../java/org/apache/ignite/internal/raft/Loza.java | 55 ++-
.../ignite/raft/server/impl/RaftServerImpl.java | 41 +--
modules/rest/pom.xml | 11 -
.../InMemoryConfigurationStorage.java | 89 -----
modules/runner/pom.xml | 13 -
.../ignite/internal/runner/app/IgnitionTest.java | 12 +
.../extended/BaselineConfigurationSchema.java | 30 --
.../extended/DataStorageConfigurationSchema.java | 37 --
.../org/apache/ignite/internal/app/IgniteImpl.java | 8 +-
.../apache/ignite/internal/app/IgnitionImpl.java | 56 +--
.../org/apache/ignite/utils/IgniteProperties.java | 1 +
.../ignite/internal/schema/ByteBufferRow.java | 8 +-
.../ignite/internal/schema/SchemaManager.java | 38 ++
.../marshaller/asm/AsmSerializerGenerator.java | 6 +-
modules/table/pom.xml | 16 +
.../ignite/internal/table/AbstractTableView.java | 4 +-
.../ignite/internal/table/KVBinaryViewImpl.java | 2 +-
.../apache/ignite/internal/table/KVViewImpl.java | 2 +-
.../ignite/internal/table/RecordViewImpl.java | 2 +-
.../apache/ignite/internal/table/TableImpl.java | 2 +-
...ableSchemaManager.java => TableSchemaView.java} | 2 +-
.../ignite/internal/table/TupleMarshallerImpl.java | 4 +-
.../internal/table/distributed/TableManager.java | 304 +++++++++++++++
.../table/distributed/TableManagerImpl.java | 69 ----
.../table/distributed/command/DeleteCommand.java | 90 +++++
.../table/distributed/command/GetCommand.java | 89 +++++
.../table/distributed/command/InsertCommand.java | 89 +++++
.../table/distributed/command/ReplaceCommand.java | 114 ++++++
.../table/distributed/command/UpsertCommand.java | 89 +++++
.../command/response/KVGetResponse.java | 88 +++++
.../distributed/raft/PartitionCommandListener.java | 166 +++++++++
.../distributed/storage/InternalTableImpl.java | 138 +++++++
.../table/distributed/DistributedTableTest.java | 349 ++++++++++++++++++
.../ignite/table/impl/DummySchemaManagerImpl.java | 4 +-
modules/vault/pom.xml | 7 +
.../apache/ignite/internal/vault/VaultManager.java | 10 +
.../ignite/internal/vault/common/VaultEntry.java | 70 ++++
parent/pom.xml | 1 +
63 files changed, 3152 insertions(+), 439 deletions(-)
diff --git a/modules/affinity/pom.xml b/modules/affinity/pom.xml
index 3cc6788..981b8e7 100644
--- a/modules/affinity/pom.xml
+++ b/modules/affinity/pom.xml
@@ -35,6 +35,11 @@
<dependencies>
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-raft</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-configuration</artifactId>
</dependency>
@@ -45,6 +50,11 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-baseline</artifactId>
</dependency>
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 7e28308..16bca40 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -17,15 +17,37 @@
package org.apache.ignite.internal.affinity;
-import org.apache.ignite.internal.baseline.BaselineManager;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
+import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.jetbrains.annotations.NotNull;
/**
* Affinity manager is responsible for affinity function related logic including calculating affinity assignments.
*/
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class AffinityManager {
+public class AffinityManager {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(AffinityManager.class);
+
+ /** Tables prefix for the metasorage. */
+ private static final String INTERNAL_PREFIX = "internal.tables.";
+
/**
* MetaStorage manager in order to watch private distributed affinity specific configuration,
* cause ConfigurationManger handles only public configuration.
@@ -38,23 +60,142 @@ import org.apache.ignite.internal.metastorage.MetaStorageManager;
/** Baseline manager. */
private final BaselineManager baselineMgr;
+ /** Vault manager. */
+ private final VaultManager vaultManager;
+
+ /** Affinity calculate subscription future. */
+ private CompletableFuture<Long> affinityCalculateSubscriptionFut = null;
+
/**
- * The constructor.
- *
- * @param configurationMgr Configuration manager.
- * @param metaStorageMgr Meta storage manager.
+ * @param configurationMgr Configuration module.
+ * @param metaStorageMgr Meta storage service.
*/
public AffinityManager(
ConfigurationManager configurationMgr,
MetaStorageManager metaStorageMgr,
- BaselineManager baselineMgr
+ BaselineManager baselineMgr,
+ VaultManager vaultManager
) {
this.configurationMgr = configurationMgr;
this.metaStorageMgr = metaStorageMgr;
this.baselineMgr = baselineMgr;
+ this.vaultManager = vaultManager;
+
+ String localNodeName = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
+ .name().value();
+
+ configurationMgr.configurationRegistry().getConfiguration(ClusterConfiguration.KEY)
+ .metastorageNodes().listen(ctx -> {
+ if (ctx.newValue() != null) {
+ if (hasMetastorageLocally(localNodeName, ctx.newValue()))
+ subscribeToAssignmentCalculation();
+ else
+ unsubscribeFromAssignmentCalculation();
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+
+ String[] metastorageMembers = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
+ .metastorageNodes().value();
+
+ if (hasMetastorageLocally(localNodeName, metastorageMembers))
+ subscribeToAssignmentCalculation();
+ }
+
+ /**
+ * Checks whether the local node hosts Metastorage.
+ *
+ * @param localNodeName Local node uniq name.
+ * @param metastorageMembers Metastorage members names.
+ * @return True if the node has Metastorage, false otherwise.
+ */
+ private boolean hasMetastorageLocally(String localNodeName, String[] metastorageMembers) {
+ boolean isLocalNodeHasMetasorage = false;
+
+ for (String name : metastorageMembers) {
+ if (name.equals(localNodeName)) {
+ isLocalNodeHasMetasorage = true;
+
+ break;
+ }
+ }
+ return isLocalNodeHasMetasorage;
+ }
+
+ /**
+ * Subscribes to metastorage members update.
+ */
+ private void subscribeToAssignmentCalculation() {
+ assert affinityCalculateSubscriptionFut == null : "Affinity calculation already subscribed";
+
+ String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+
+ affinityCalculateSubscriptionFut = metaStorageMgr.registerWatch(new Key(tableInternalPrefix), new WatchListener() {
+ @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+ for (WatchEvent evt : events) {
+ if (ArrayUtils.empty(evt.newEntry().value())) {
+ String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+ String placeholderValue = keyTail.substring(0, keyTail.indexOf('.'));
+
+ UUID tblId = UUID.fromString(placeholderValue);
+
+ try {
+ String name = new String(vaultManager.get((INTERNAL_PREFIX + tblId.toString())
+ .getBytes(StandardCharsets.UTF_8)).get().value(), StandardCharsets.UTF_8);
+
+ int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+ .tables().get(name).partitions().value();
+ int replicas = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+ .tables().get(name).replicas().value();
+
+ metaStorageMgr.invoke(evt.newEntry().key(),
+ Conditions.value().eq(evt.newEntry().value()),
+ Operations.put(IgniteUtils.toBytes(
+ RendezvousAffinityFunction.assignPartitions(
+ baselineMgr.nodes(),
+ partitions,
+ replicas,
+ false,
+ null
+ ))),
+ Operations.noop());
+
+ LOG.info("Affinity manager calculated assignment for the table [name={}, tblId={}]",
+ name, tblId);
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to initialize affinity [key={}]",
+ evt.newEntry().key().toString(), e);
+ }
+ }
+ }
+
+ return true;
+ }
+
+ @Override public void onError(@NotNull Throwable e) {
+ LOG.error("Metastorage listener issue", e);
+ }
+ });
}
- // TODO: IGNITE-14237 Affinity function.
- // TODO: IGNITE-14238 Creating and destroying caches.
- // TODO: IGNITE-14235 Provide a minimal cache/table configuration.
+ /**
+ * Unsubscribes a listener form the affinity calculation.
+ */
+ private void unsubscribeFromAssignmentCalculation() {
+ if (affinityCalculateSubscriptionFut == null)
+ return;
+
+ try {
+ Long subscriptionId = affinityCalculateSubscriptionFut.get();
+
+ metaStorageMgr.unregisterWatch(subscriptionId);
+
+ affinityCalculateSubscriptionFut = null;
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Couldn't unsubscribe for Metastorage updates", e);
+ }
+ }
}
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
new file mode 100644
index 0000000..da5e6b1
--- /dev/null
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunction.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.UUID;
+import java.util.function.BiPredicate;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Affinity function for partitioned table based on Highest Random Weight algorithm. This function supports the
+ * following configuration:
+ * <ul>
+ * <li>
+ * {@code partitions} - Number of partitions to spread across nodes.
+ * </li>
+ * <li>
+ * {@code excludeNeighbors} - If set to {@code true}, will exclude same-host-neighbors
+ * from being replicas of each other. This flag can be ignored in cases when topology has no enough nodes
+ * for assign replicas.
+ * Note that {@code nodeFilter} is ignored if {@code excludeNeighbors} is set to {@code true}.
+ * </li>
+ * <li>
+ * {@code nodeFilter} - Optional filter for replica nodes. If provided, then only
+ * nodes that pass this filter will be selected as replica nodes. If not provided, then
+ * replicas nodes will be selected out of all nodes available for this table.
+ * </li>
+ * </ul>
+ */
+public class RendezvousAffinityFunction {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(RendezvousAffinityFunction.class);
+
+ /** Comparator. */
+ private static final Comparator<IgniteBiTuple<Long, ClusterNode>> COMPARATOR = new HashComparator();
+
+ /** Maximum number of partitions. */
+ public static final int MAX_PARTITIONS_COUNT = 65000;
+
+ /** Exclude neighbors warning. */
+ private static boolean exclNeighborsWarn;
+
+ /**
+ * Resolves node hash.
+ *
+ * @param node Cluster node;
+ * @return Node hash.
+ */
+ public static Object resolveNodeHash(ClusterNode node) {
+ return node.name();
+ }
+
+ /**
+ * Returns collection of nodes for specified partition.
+ *
+ * @param part Partition.
+ * @param nodes Nodes.
+ * @param replicas Number partition replicas.
+ * @param neighborhoodCache Neighborhood.
+ * @param exclNeighbors If true neighbors are excluded, false otherwise.
+ * @param nodeFilter Filter for nodes.
+ * @return Assignment.
+ */
+ public static List<ClusterNode> assignPartition(
+ int part,
+ List<ClusterNode> nodes,
+ int replicas,
+ Map<UUID, Collection<ClusterNode>> neighborhoodCache,
+ boolean exclNeighbors,
+ BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
+ ) {
+ if (nodes.size() <= 1)
+ return nodes;
+
+ IgniteBiTuple<Long, ClusterNode>[] hashArr =
+ (IgniteBiTuple<Long, ClusterNode>[])new IgniteBiTuple[nodes.size()];
+
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ Object nodeHash = resolveNodeHash(node);
+
+ long hash = hash(nodeHash.hashCode(), part);
+
+ hashArr[i] = new IgniteBiTuple<>(hash, node);
+ }
+
+ final int effectiveReplicas = replicas == Integer.MAX_VALUE ? nodes.size() : Math.min(replicas, nodes.size());
+
+ Iterable<ClusterNode> sortedNodes = new LazyLinearSortedContainer(hashArr, effectiveReplicas);
+
+ // REPLICATED cache case
+ if (replicas == Integer.MAX_VALUE)
+ return replicatedAssign(nodes, sortedNodes);
+
+ Iterator<ClusterNode> it = sortedNodes.iterator();
+
+ List<ClusterNode> res = new ArrayList<>(effectiveReplicas);
+
+ Collection<ClusterNode> allNeighbors = new HashSet<>();
+
+ ClusterNode first = it.next();
+
+ res.add(first);
+
+ if (exclNeighbors)
+ allNeighbors.addAll(neighborhoodCache.get(first.id()));
+
+ // Select another replicas.
+ if (replicas > 1) {
+ while (it.hasNext() && res.size() < effectiveReplicas) {
+ ClusterNode node = it.next();
+
+ if (exclNeighbors) {
+ if (!allNeighbors.contains(node)) {
+ res.add(node);
+
+ allNeighbors.addAll(neighborhoodCache.get(node.id()));
+ }
+ }
+ else if (nodeFilter == null || nodeFilter.test(node, res)) {
+ res.add(node);
+
+ if (exclNeighbors)
+ allNeighbors.addAll(neighborhoodCache.get(node.id()));
+ }
+ }
+ }
+
+ if (res.size() < effectiveReplicas && nodes.size() >= effectiveReplicas && exclNeighbors) {
+ // Need to iterate again in case if there are no nodes which pass exclude neighbors replicas criteria.
+ it = sortedNodes.iterator();
+
+ it.next();
+
+ while (it.hasNext() && res.size() < effectiveReplicas) {
+ ClusterNode node = it.next();
+
+ if (!res.contains(node))
+ res.add(node);
+ }
+
+ if (!exclNeighborsWarn) {
+ LOG.warn("Affinity function excludeNeighbors property is ignored " +
+ "because topology has no enough nodes to assign all replicas.");
+
+ exclNeighborsWarn = true;
+ }
+ }
+
+ assert res.size() <= effectiveReplicas;
+
+ return res;
+ }
+
+ /**
+ * Creates assignment for REPLICATED table
+ *
+ * @param nodes Topology.
+ * @param sortedNodes Sorted for specified partitions nodes.
+ * @return Assignment.
+ */
+ private static List<ClusterNode> replicatedAssign(List<ClusterNode> nodes,
+ Iterable<ClusterNode> sortedNodes) {
+ ClusterNode first = sortedNodes.iterator().next();
+
+ List<ClusterNode> res = new ArrayList<>(nodes.size());
+
+ res.add(first);
+
+ for (ClusterNode n : nodes)
+ if (!n.equals(first))
+ res.add(n);
+
+ assert res.size() == nodes.size() : "Not enough replicas: " + res.size();
+
+ return res;
+ }
+
+ /**
+ * The pack partition number and nodeHash.hashCode to long and mix it by hash function based on the Wang/Jenkins
+ * hash.
+ *
+ * @param key0 Hash key.
+ * @param key1 Hash key.
+ * @return Long hash key.
+ * @see <a href="https://gist.github.com/badboy/6267743#64-bit-mix-functions">64 bit mix functions</a>
+ */
+ private static long hash(int key0, int key1) {
+ long key = (key0 & 0xFFFFFFFFL)
+ | ((key1 & 0xFFFFFFFFL) << 32);
+
+ key = (~key) + (key << 21); // key = (key << 21) - key - 1;
+ key ^= (key >>> 24);
+ key += (key << 3) + (key << 8); // key * 265
+ key ^= (key >>> 14);
+ key += (key << 2) + (key << 4); // key * 21
+ key ^= (key >>> 28);
+ key += (key << 31);
+
+ return key;
+ }
+
+ /**
+ * Generates an assignment by the given parameters.
+ *
+ * @param currentTopologySnapshot List of topology nodes.
+ * @param partitions Number of table partitions.
+ * @param replicas Number partition replicas.
+ * @param exclNeighbors If true neighbors are excluded fro the one partition assignment, false otherwise.
+ * @param nodeFilter Filter for nodes.
+ * @return List nodes by partition.
+ */
+ public static List<List<ClusterNode>> assignPartitions(
+ Collection<ClusterNode> currentTopologySnapshot,
+ int partitions,
+ int replicas,
+ boolean exclNeighbors,
+ BiPredicate<ClusterNode, List<ClusterNode>> nodeFilter
+ ) {
+ assert partitions <= MAX_PARTITIONS_COUNT : "partitions <= " + MAX_PARTITIONS_COUNT;
+ assert partitions > 0 : "parts > 0";
+ assert replicas > 0 : "replicas > 0";
+
+ List<List<ClusterNode>> assignments = new ArrayList<>(partitions);
+
+ Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
+ neighbors(currentTopologySnapshot) : null;
+
+ List<ClusterNode> nodes = new ArrayList<>(currentTopologySnapshot);
+
+ for (int i = 0; i < partitions; i++) {
+ List<ClusterNode> partAssignment = assignPartition(i, nodes, replicas, neighborhoodCache, exclNeighbors, nodeFilter);
+
+ assignments.add(partAssignment);
+ }
+
+ return assignments;
+ }
+
+ /**
+ * Builds neighborhood map for all nodes in snapshot.
+ *
+ * @param topSnapshot Topology snapshot.
+ * @return Neighbors map.
+ */
+ public static Map<UUID, Collection<ClusterNode>> neighbors(Collection<ClusterNode> topSnapshot) {
+ Map<String, Collection<ClusterNode>> macMap = new HashMap<>(topSnapshot.size(), 1.0f);
+
+ // Group by mac addresses.
+ for (ClusterNode node : topSnapshot) {
+ String macs = String.valueOf(node.hashCode());
+ //node.attribute(IgniteNodeAttributes.ATTR_MACS);
+
+ Collection<ClusterNode> nodes = macMap.get(macs);
+
+ if (nodes == null)
+ macMap.put(macs, nodes = new HashSet<>());
+
+ nodes.add(node);
+ }
+
+ Map<UUID, Collection<ClusterNode>> neighbors = new HashMap<>(topSnapshot.size(), 1.0f);
+
+ for (Collection<ClusterNode> group : macMap.values())
+ for (ClusterNode node : group)
+ neighbors.put(node.id(), group);
+
+ return neighbors;
+ }
+
+ /**
+ *
+ */
+ private static class HashComparator implements Comparator<IgniteBiTuple<Long, ClusterNode>>, Serializable {
+ /**
+ *
+ */
+ private static final long serialVersionUID = 0L;
+
+ /** {@inheritDoc} */
+ @Override public int compare(IgniteBiTuple<Long, ClusterNode> o1, IgniteBiTuple<Long, ClusterNode> o2) {
+ return o1.get1() < o2.get1() ? -1 : o1.get1() > o2.get1() ? 1 :
+ o1.get2().id().compareTo(o2.get2().id());
+ }
+ }
+
+ /**
+ * Sorts the initial array with linear sort algorithm array
+ */
+ private static class LazyLinearSortedContainer implements Iterable<ClusterNode> {
+ /** Initial node-hash array. */
+ private final IgniteBiTuple<Long, ClusterNode>[] arr;
+
+ /** Count of the sorted elements */
+ private int sorted;
+
+ /**
+ * @param arr Node / partition hash list.
+ * @param needFirstSortedCnt Estimate count of elements to return by iterator.
+ */
+ LazyLinearSortedContainer(IgniteBiTuple<Long, ClusterNode>[] arr, int needFirstSortedCnt) {
+ this.arr = arr;
+
+ if (needFirstSortedCnt > (int)Math.log(arr.length)) {
+ Arrays.sort(arr, COMPARATOR);
+
+ sorted = arr.length;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<ClusterNode> iterator() {
+ return new SortIterator();
+ }
+
+ /**
+ *
+ */
+ private class SortIterator implements Iterator<ClusterNode> {
+ /** Index of the first unsorted element. */
+ private int cur;
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return cur < arr.length;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ if (cur < sorted)
+ return arr[cur++].get2();
+
+ IgniteBiTuple<Long, ClusterNode> min = arr[cur];
+
+ int minIdx = cur;
+
+ for (int i = cur + 1; i < arr.length; i++) {
+ if (COMPARATOR.compare(arr[i], min) < 0) {
+ minIdx = i;
+
+ min = arr[i];
+ }
+ }
+
+ if (minIdx != cur) {
+ arr[minIdx] = arr[cur];
+
+ arr[cur] = min;
+ }
+
+ sorted = cur++;
+
+ return min.get2();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void remove() {
+ throw new UnsupportedOperationException("Remove doesn't supported");
+ }
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "U.toString(RendezvousAffinityFunction.class, this)";
+ }
+
+ /**
+ * Gets absolute value for integer. If integer is {@link Integer#MIN_VALUE}, then {@code 0} is returned.
+ *
+ * @param i Integer.
+ * @return Absolute value.
+ */
+ public static int safeAbs(int i) {
+ i = Math.abs(i);
+
+ return i < 0 ? 0 : i;
+ }
+}
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
new file mode 100644
index 0000000..2dd46df
--- /dev/null
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/RendezvousAffinityFunctionTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.function.Function;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.Test;
+
+import static java.util.Objects.nonNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test for affinity function.
+ */
+public class RendezvousAffinityFunctionTest {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(RendezvousAffinityFunctionTest.class);
+
+ /** Affinity deviation ratio. */
+ public static final double AFFINITY_DEVIATION_RATIO = 0.2;
+
+ @Test
+ public void testPartitionDistribution() {
+ int nodes = 50;
+
+ int parts = 10_000;
+
+ int replicas = 4;
+
+ ArrayList<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+
+ assertTrue(parts > nodes, "Partitions should be more that nodes");
+
+ int ideal = (parts * replicas) / nodes;
+
+ List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions(
+ clusterNodes,
+ parts,
+ replicas,
+ false,
+ null
+ );
+
+ HashMap<ClusterNode, ArrayList<Integer>> assignmentByNode = new HashMap<>(nodes);
+
+ int part = 0;
+
+ for (List<ClusterNode> partNodes : assignment) {
+ for (ClusterNode node : partNodes) {
+ ArrayList<Integer> nodeParts = assignmentByNode.get(node);
+
+ if (nodeParts == null)
+ assignmentByNode.put(node, nodeParts = new ArrayList<>());
+
+ nodeParts.add(part);
+ }
+
+ part++;
+ }
+
+ for (ClusterNode node : clusterNodes) {
+ ArrayList<Integer> nodeParts = assignmentByNode.get(node);
+
+ assertNotNull(nodeParts);
+
+ assertTrue(nodeParts.size() > ideal * (1 - AFFINITY_DEVIATION_RATIO)
+ && nodeParts.size() < ideal * (1 + AFFINITY_DEVIATION_RATIO),
+ "Partition distribution is too far from ideal [node=" + node
+ + ", size=" + nodeParts.size()
+ + ", idealSize=" + ideal
+ + ", parts=" + compact(nodeParts) + ']');
+ }
+ }
+
+ @NotNull private ArrayList<ClusterNode> prepareNetworkTopology(int nodes) {
+ ArrayList<ClusterNode> clusterNodes = new ArrayList<>(nodes);
+
+ for (int i = 0; i < nodes; i++)
+ clusterNodes.add(new ClusterNode("Node " + i, "127.0.0.1", 121212));
+ return clusterNodes;
+ }
+
+ @Test
+ public void serializeAssignment() {
+ int nodes = 50;
+
+ int parts = 10_000;
+
+ int replicas = 4;
+
+ ArrayList<ClusterNode> clusterNodes = prepareNetworkTopology(nodes);
+
+ assertTrue(parts > nodes, "Partitions should be more that nodes");
+
+ List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions(
+ clusterNodes,
+ parts,
+ replicas,
+ false,
+ null
+ );
+
+ byte[] assignmentBytes = IgniteUtils.toBytes(assignment);
+
+ assertNotNull(assignment);
+
+ LOG.info("Assignment is serialized successfully [bytes=" + assignmentBytes.length + ']');
+
+ List<List<ClusterNode>> deserializedAssignment = (List<List<ClusterNode>>)IgniteUtils.fromBytes(assignmentBytes);
+
+ assertNotNull(deserializedAssignment);
+
+ assertEquals(assignment, deserializedAssignment);
+ }
+
+ /**
+ * Returns sorted and compacted string representation of given {@code col}. Two nearby numbers with difference at
+ * most 1 are compacted to one continuous segment. E.g. collection of [1, 2, 3, 5, 6, 7, 10] will be compacted to
+ * [1-3, 5-7, 10].
+ *
+ * @param col Collection of integers.
+ * @return Compacted string representation of given collections.
+ */
+ public static String compact(Collection<Integer> col) {
+ return compact(col, i -> i + 1);
+ }
+
+ /**
+ * Returns sorted and compacted string representation of given {@code col}. Two nearby numbers are compacted to one
+ * continuous segment. E.g. collection of [1, 2, 3, 5, 6, 7, 10] with {@code nextValFun = i -> i + 1} will be
+ * compacted to [1-3, 5-7, 10].
+ *
+ * @param col Collection of numbers.
+ * @param nextValFun Function to get nearby number.
+ * @return Compacted string representation of given collections.
+ */
+ public static <T extends Number & Comparable<? super T>> String compact(
+ Collection<T> col,
+ Function<T, T> nextValFun
+ ) {
+ assert nonNull(col);
+ assert nonNull(nextValFun);
+
+ if (col.isEmpty())
+ return "[]";
+
+ StringBuffer sb = new StringBuffer();
+ sb.append('[');
+
+ List<T> l = new ArrayList<>(col);
+ Collections.sort(l);
+
+ T left = l.get(0), right = left;
+ for (int i = 1; i < l.size(); i++) {
+ T val = l.get(i);
+
+ if (right.compareTo(val) == 0 || nextValFun.apply(right).compareTo(val) == 0) {
+ right = val;
+ continue;
+ }
+
+ if (left.compareTo(right) == 0)
+ sb.append(left);
+ else
+ sb.append(left).append('-').append(right);
+
+ sb.append(',').append(' ');
+
+ left = right = val;
+ }
+
+ if (left.compareTo(right) == 0)
+ sb.append(left);
+ else
+ sb.append(left).append('-').append(right);
+
+ sb.append(']');
+
+ return sb.toString();
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/app/Ignite.java b/modules/api/src/main/java/org/apache/ignite/app/Ignite.java
index c9613dd..2c98cfd 100644
--- a/modules/api/src/main/java/org/apache/ignite/app/Ignite.java
+++ b/modules/api/src/main/java/org/apache/ignite/app/Ignite.java
@@ -17,16 +17,16 @@
package org.apache.ignite.app;
-import org.apache.ignite.table.manager.TableManager;
+import org.apache.ignite.table.manager.IgniteTables;
/**
* Ignite node interface. Main entry-point for all Ignite APIs.
*/
public interface Ignite extends AutoCloseable {
/**
- * Gets a manager for tables.
+ * Gets an object for manipulate Ignite tables.
*
- * @return Table manager.
+ * @return Ignite tables.
*/
- TableManager tableManager();
+ IgniteTables tables();
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/network/NetworkConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/network/NetworkConfigurationSchema.java
index 1a1b51f..df36955 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/network/NetworkConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/network/NetworkConfigurationSchema.java
@@ -28,13 +28,13 @@ import org.apache.ignite.configuration.storage.ConfigurationType;
*/
@ConfigurationRoot(rootName = "network", type = ConfigurationType.LOCAL)
public class NetworkConfigurationSchema {
- /** */
+ /** Network port. */
@Min(1024)
@Max(0xFFFF)
@Value(hasDefault = true)
- public final int port = 3040;
+ public final int port = 47500;
- /** */
+ /** Cluster nodes. */
@Value(hasDefault = true)
- public String[] netClusterNodes = {"localhost:" + port};
+ public final String[] netClusterNodes = new String[0];
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/rest/RestConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/rest/RestConfigurationSchema.java
index d65360f..4f4150e 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/rest/RestConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/rest/RestConfigurationSchema.java
@@ -26,15 +26,16 @@ import org.apache.ignite.configuration.storage.ConfigurationType;
/**
* Configuration schema for REST endpoint subtree.
*/
+@SuppressWarnings("PMD.UnusedPrivateField")
@ConfigurationRoot(rootName = "rest", type = ConfigurationType.LOCAL)
public class RestConfigurationSchema {
- /** */
+ /** TCP port. */
@Min(1024)
@Max(0xFFFF)
@Value(hasDefault = true)
public final int port = 10300;
- /** */
+ /** TCP port range. */
@Min(0)
@Value(hasDefault = true)
public final int portRange = 0;
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/ClusterConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/ClusterConfigurationSchema.java
index 63278e3..3266d4a 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/ClusterConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/ClusterConfigurationSchema.java
@@ -28,5 +28,5 @@ import org.apache.ignite.configuration.storage.ConfigurationType;
public class ClusterConfigurationSchema {
/** List of unique names of those cluster nodes that will host distributed metastorage instances. */
@Value
- String[] metastorageClusterNodeNames;
+ String[] metastorageNodes;
}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/rest/RestConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/NodeConfigurationSchema.java
similarity index 69%
copy from modules/api/src/main/java/org/apache/ignite/configuration/schemas/rest/RestConfigurationSchema.java
copy to modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/NodeConfigurationSchema.java
index d65360f..5657252 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/rest/RestConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/runner/NodeConfigurationSchema.java
@@ -15,27 +15,24 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.schemas.rest;
+package org.apache.ignite.configuration.schemas.runner;
-import javax.validation.constraints.Max;
-import javax.validation.constraints.Min;
+import java.util.UUID;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
import org.apache.ignite.configuration.annotation.Value;
import org.apache.ignite.configuration.storage.ConfigurationType;
/**
- * Configuration schema for REST endpoint subtree.
+ * Local node configuration schema.
*/
-@ConfigurationRoot(rootName = "rest", type = ConfigurationType.LOCAL)
-public class RestConfigurationSchema {
- /** */
- @Min(1024)
- @Max(0xFFFF)
+@SuppressWarnings("PMD.UnusedPrivateField")
+@ConfigurationRoot(rootName = "node", type = ConfigurationType.LOCAL)
+public class NodeConfigurationSchema {
+ /** Uniq local node name. */
@Value(hasDefault = true)
- public final int port = 10300;
+ final String name = UUID.randomUUID().toString();
- /** */
- @Min(0)
+ /** It is a copy of appropriate property from the cluster configuration. */
@Value(hasDefault = true)
- public final int portRange = 0;
+ final String[] metastorageNodes = new String[0];
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/configuration/extended/AutoAdjustConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
similarity index 64%
rename from modules/runner/src/main/java/org/apache/ignite/configuration/extended/AutoAdjustConfigurationSchema.java
rename to modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
index 87ebef5..835dc85 100644
--- a/modules/runner/src/main/java/org/apache/ignite/configuration/extended/AutoAdjustConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TableConfigurationSchema.java
@@ -15,21 +15,32 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.extended;
+package org.apache.ignite.configuration.schemas.table;
+import javax.validation.constraints.Max;
import javax.validation.constraints.Min;
import org.apache.ignite.configuration.annotation.Config;
import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Immutable;
-/** */
+/**
+ * Table configuartion schema class.
+ */
@Config
-public class AutoAdjustConfigurationSchema {
- /** */
+public class TableConfigurationSchema {
+ /** Table name. */
@Value
- public boolean enabled;
+ @Immutable
+ public String name;
- /** */
- @Value
- @Min(value = 0, message = "Minimum value is 0")
- public int timeout;
+ /** Table partitions. */
+ @Min(0)
+ @Max(65000)
+ @Value(hasDefault = true)
+ public int partitions = 1024;
+
+ /** Count of table partition replicas. */
+ @Min(1)
+ @Value(hasDefault = true)
+ public int replicas = 0;
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/configuration/extended/LocalConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TablesConfigurationSchema.java
similarity index 71%
rename from modules/runner/src/main/java/org/apache/ignite/configuration/extended/LocalConfigurationSchema.java
rename to modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TablesConfigurationSchema.java
index 0097528..3ed2539 100644
--- a/modules/runner/src/main/java/org/apache/ignite/configuration/extended/LocalConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/table/TablesConfigurationSchema.java
@@ -15,23 +15,18 @@
* limitations under the License.
*/
-package org.apache.ignite.configuration.extended;
+package org.apache.ignite.configuration.schemas.table;
-import org.apache.ignite.configuration.annotation.ConfigValue;
import org.apache.ignite.configuration.annotation.ConfigurationRoot;
+import org.apache.ignite.configuration.annotation.NamedConfigValue;
import org.apache.ignite.configuration.storage.ConfigurationType;
-/**
- *
- */
+/** Tables configuration schema. */
@SuppressWarnings("PMD.UnusedPrivateField")
-@ConfigurationRoot(rootName = "local", type = ConfigurationType.LOCAL)
-public class LocalConfigurationSchema {
- /** */
- @ConfigValue
- private BaselineConfigurationSchema baseline;
+@ConfigurationRoot(rootName = "table", type = ConfigurationType.DISTRIBUTED)
+public class TablesConfigurationSchema {
- /** */
- @ConfigValue
- private DataStorageConfigurationSchema dataStorage;
+ /** List of configured tables. */
+ @NamedConfigValue
+ TableConfigurationSchema tables;
}
diff --git a/modules/api/src/main/java/org/apache/ignite/table/manager/TableManager.java b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
similarity index 51%
rename from modules/api/src/main/java/org/apache/ignite/table/manager/TableManager.java
rename to modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
index f7fcf50..24b20f6 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/manager/TableManager.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
@@ -15,10 +15,42 @@
* limitations under the License.
*/
+/**
+ * Interface for manage tables.
+ */
package org.apache.ignite.table.manager;
+import java.util.List;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.table.Table;
+
/**
* Interface that provides methods for managing tables.
*/
-public interface TableManager {
+public interface IgniteTables {
+ /**
+ * Creates a cluster table.
+ * The table changes if already exists.
+ *
+ * @param name Table name.
+ * @param tableInitChange Table changer.
+ * @return Table.
+ */
+ Table createTable(String name, Consumer<TableChange> tableInitChange);
+
+ /**
+ * Gets a list of all started tables.
+ *
+ * @return List of tables.
+ */
+ List<Table> tables();
+
+ /**
+ * Gets a table by name, if it was created before.
+ *
+ * @param name Name of the table.
+ * @return Tables with corresponding name or {@code null} if table isn't created.
+ */
+ Table table(String name);
}
diff --git a/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java b/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
index e2b78d8..be4d0fa 100644
--- a/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
+++ b/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
@@ -17,8 +17,10 @@
package org.apache.ignite.internal.baseline;
+import java.util.Collection;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
/**
@@ -54,5 +56,14 @@ import org.apache.ignite.network.ClusterService;
this.metastorageMgr = metastorageMgr;
this.clusterSvc = clusterSvc;
}
+
+ /**
+ * Gets all nodes which participant in baseline and may process user data.
+ *
+ * @return All nodes which were in baseline.
+ */
+ public Collection<ClusterNode> nodes() {
+ return clusterSvc.topologyService().allMembers();
+ }
}
diff --git a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationRegistry.java b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationRegistry.java
index 5d17e1d..a1a913e 100644
--- a/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationRegistry.java
+++ b/modules/configuration/src/main/java/org/apache/ignite/configuration/ConfigurationRegistry.java
@@ -57,7 +57,7 @@ import static org.apache.ignite.configuration.internal.util.ConfigurationUtil.in
/** */
public class ConfigurationRegistry {
- /** Logger. */
+ /** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(ConfigurationRegistry.class);
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java b/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
index a97d78b..954d5d7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/tostring/IgniteToStringBuilder.java
@@ -1872,7 +1872,6 @@ public class IgniteToStringBuilder {
* @return Descriptor for the class.
* @throws IllegalAccessException If failed.
*/
- @SuppressWarnings({"TooBroadScope"})
private static <T> ClassDescriptor getClassDescriptor(Class<T> cls) throws IllegalAccessException {
assert cls != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index 0457241..dcbb3d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -179,6 +179,11 @@ public final class ArrayUtils {
}
};
+ /** */
+ public static boolean empty(byte[] arr) {
+ return arr == null || arr.length == 0;
+ }
+
/**
* Stub.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 60d637b..a490af3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -17,13 +17,21 @@
package org.apache.ignite.internal.util;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import org.apache.ignite.lang.IgniteLogger;
/**
* Collection of utility methods used throughout the system.
*/
public class IgniteUtils {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteUtils.class);
+
/** Version of the JDK. */
private static String jdkVer;
@@ -167,4 +175,47 @@ public class IgniteUtils {
return hash(val);
}
+
+ /**
+ * Serializes an object to byte array using native java serialization mechanism.
+ *
+ * @param obj Object to serialize.
+ * @return Byte array.
+ */
+ public static byte[] toBytes(Object obj) {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+ try (ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+ out.writeObject(obj);
+
+ out.flush();
+
+ return bos.toByteArray();
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Could not serialize a class [cls=" + obj.getClass().getName() + "]", e);
+
+ return null;
+ }
+ }
+
+ /**
+ * Deserializes an object from byte array using native java serialization mechanism.
+ *
+ * @param bytes Byte array.
+ * @return Object.
+ */
+ public static Object fromBytes(byte[] bytes) {
+ try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
+ try (ObjectInputStream in = new ObjectInputStream(bis)) {
+ return in.readObject();
+ }
+ }
+ catch (Exception e) {
+ LOG.warn("Could not deserialize an object", e);
+
+ return null;
+ }
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
new file mode 100644
index 0000000..b45bd35
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/lang/IgniteBiTuple.java
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Convenience class representing mutable tuple of two values.
+ */
+public class IgniteBiTuple<V1, V2> implements Map<V1, V2>, Map.Entry<V1, V2>,
+ Iterable<Object>, Externalizable, Cloneable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** First value. */
+ private V1 val1;
+
+ /** Second value. */
+ private V2 val2;
+
+ /**
+ * Empty constructor required by {@link Externalizable}.
+ */
+ public IgniteBiTuple() {
+ // No-op.
+ }
+
+ /**
+ * Fully initializes this tuple.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ */
+ public IgniteBiTuple(@Nullable V1 val1, @Nullable V2 val2) {
+ this.val1 = val1;
+ this.val2 = val2;
+ }
+
+ /**
+ * Swaps values.
+ *
+ * @return New tuple with swapped values.
+ */
+ public IgniteBiTuple<V2, V1> swap() {
+ return new IgniteBiTuple<>(val2, val1);
+ }
+
+ /**
+ * Gets first value.
+ *
+ * @return First value.
+ */
+ public V1 get1() {
+ return val1;
+ }
+
+ /**
+ * Gets second value.
+ *
+ * @return Second value.
+ */
+ public V2 get2() {
+ return val2;
+ }
+
+ /**
+ * Sets first value.
+ *
+ * @param val1 First value.
+ */
+ public void set1(@Nullable V1 val1) {
+ this.val1 = val1;
+ }
+
+ /**
+ * Sets second value.
+ *
+ * @param val2 Second value.
+ */
+ public void set2(@Nullable V2 val2) {
+ this.val2 = val2;
+ }
+
+ /**
+ * Sets both values in the tuple.
+ *
+ * @param val1 First value.
+ * @param val2 Second value.
+ */
+ public void set(@Nullable V1 val1, @Nullable V2 val2) {
+ set1(val1);
+ set2(val2);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V1 getKey() {
+ return val1;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V2 getValue() {
+ return val2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public V2 setValue(@Nullable V2 val) {
+ V2 old = val2;
+
+ set2(val);
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Iterator<Object> iterator() {
+ return new Iterator<Object>() {
+ /** */
+ private int nextIdx = 1;
+
+ @Override public boolean hasNext() {
+ return nextIdx < 3;
+ }
+
+ @Nullable @Override public Object next() {
+ if (!hasNext())
+ throw new NoSuchElementException();
+
+ Object res = null;
+
+ if (nextIdx == 1)
+ res = get1();
+ else if (nextIdx == 2)
+ res = get2();
+
+ nextIdx++;
+
+ return res;
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ /** {@inheritDoc} */
+ @Override public int size() {
+ return val1 == null && val2 == null ? 0 : 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isEmpty() {
+ return size() == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsKey(Object key) {
+ return eq(val1, key);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean containsValue(Object val) {
+ return eq(val2, val);
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V2 get(Object key) {
+ return containsKey(key) ? val2 : null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V2 put(V1 key, V2 val) {
+ V2 old = containsKey(key) ? val2 : null;
+
+ set(key, val);
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public V2 remove(Object key) {
+ if (containsKey(key)) {
+ V2 v2 = val2;
+
+ val1 = null;
+ val2 = null;
+
+ return v2;
+ }
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putAll(Map<? extends V1, ? extends V2> m) {
+ assert m != null : "m";
+ assert m.size() <= 1 : "m.size() <= 1";
+
+ for (Entry<? extends V1, ? extends V2> e : m.entrySet())
+ put(e.getKey(), e.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clear() {
+ val1 = null;
+ val2 = null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<V1> keySet() {
+ return Collections.singleton(val1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<V2> values() {
+ return Collections.singleton(val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Set<Entry<V1, V2>> entrySet() {
+ return isEmpty() ?
+ Collections.<Entry<V1,V2>>emptySet() :
+ Collections.<Entry<V1, V2>>singleton(this);
+ }
+
+ /**
+ * Convert tuple to array.
+ *
+ * @return Array with two elements from the tuple.
+ */
+ public Object[] toArray() {
+ return new Object[]{val1, val2};
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object clone() {
+ try {
+ return super.clone();
+ }
+ catch (CloneNotSupportedException ignore) {
+ throw new InternalError();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(val1);
+ out.writeObject(val2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ val1 = (V1)in.readObject();
+ val2 = (V2)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return val1 == null ? 0 : val1.hashCode() * 31 + (val2 == null ? 0 : val2.hashCode());
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (!(o instanceof IgniteBiTuple))
+ return false;
+
+ IgniteBiTuple<?, ?> t = (IgniteBiTuple<?, ?>)o;
+
+ // Both nulls or equals.
+ return eq(val1, t.val1) && eq(val2, t.val2);
+ }
+
+ /**
+ * Tests whether specified arguments are equal, or both {@code null}.
+ *
+ * @param o1 Object to compare.
+ * @param o2 Object to compare.
+ * @return Returns {@code true} if the specified arguments are equal, or both {@code null}.
+ */
+ public static boolean eq(@Nullable Object o1, @Nullable Object o2) {
+ return o1 == null ? o2 == null : o2 != null && (o1 == o2 || o1.equals(o2));
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "S.toString(IgniteBiTuple.class, this)";
+ }
+}
diff --git a/modules/metastorage/pom.xml b/modules/metastorage/pom.xml
index a59ba89..b31a74c 100644
--- a/modules/metastorage/pom.xml
+++ b/modules/metastorage/pom.xml
@@ -52,5 +52,10 @@
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-metastorage-client</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
</dependencies>
</project>
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index d613b3e..d56e51a 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -18,13 +18,15 @@
package org.apache.ignite.internal.metastorage;
import java.util.concurrent.CompletableFuture;
-
+import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.metastorage.client.MetaStorageService;
+import org.apache.ignite.metastorage.common.Condition;
+import org.apache.ignite.metastorage.common.Entry;
import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operation;
import org.apache.ignite.metastorage.common.WatchListener;
import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.internal.raft.Loza;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -93,6 +95,48 @@ import org.jetbrains.annotations.Nullable;
}
/**
+ * Proxies the invocation to metastorage.
+ *
+ * @param key The target key.
+ * @return Metastorage entry.
+ */
+ public synchronized CompletableFuture<Entry> get(@Nullable Key key) {
+ // TODO: IGNITE-14446 Implement DMS manager with watch registry.
+ return null;
+ }
+
+ /**
+ * Proxies the invocation to metastorage.
+ *
+ * @param key The target key.
+ * @param value The value to set.
+ * @return
+ */
+ public CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) {
+ // TODO: IGNITE-14446 Implement DMS manager with watch registry.
+ return null;
+ }
+
+ /**
+ * Invokes a service operation for metastorage.
+ *
+ * @param key Key in metastorage.
+ * @param condition Condition to process.
+ * @param success Success operation.
+ * @param failure Failure operation.
+ * @return Future which will complete when appropriate final operation would be invoked.
+ */
+ public CompletableFuture<Boolean> invoke(
+ @NotNull Key key,
+ @NotNull Condition condition,
+ @NotNull Operation success,
+ @NotNull Operation failure
+ ) {
+ // TODO: IGNITE-14446 Implement DMS manager with watch registry.
+ return null;
+ }
+
+ /**
* Unregister subscription for the given identifier.
*
* @param id Subscription identifier.
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/network/MetaStorageMessageTypes.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/network/MetaStorageMessageTypes.java
deleted file mode 100644
index 6113fa0..0000000
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/network/MetaStorageMessageTypes.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.metastorage.network;
-
-/**
- * Metastorage message types
- */
-// TODO: IGNITE-14088: Uncomment and use real serializer factory
-//@DirectlyMarshallableTypes(moduleType = 11)
-public enum MetaStorageMessageTypes {
- /** */
- CLUSTER_INIT_REQUEST((short)1100);
-
- /** */
- private short msgType;
-
- /**
- * Constructor.
- *
- * @param msgType Message type.
- */
- MetaStorageMessageTypes(short msgType) {
- this.msgType = msgType;
- }
-
- /**
- * @return Message type.
- */
- public short msgType() {
- return msgType;
- }
-}
diff --git a/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java b/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
index 205c32f..62caff1 100644
--- a/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
+++ b/modules/network/src/main/java/org/apache/ignite/network/ClusterNode.java
@@ -19,6 +19,7 @@ package org.apache.ignite.network;
import java.io.Serializable;
import java.util.Objects;
import org.apache.ignite.internal.tostring.S;
+import java.util.UUID;
/**
* Representation of a node in a cluster.
@@ -73,6 +74,15 @@ public class ClusterNode implements Serializable {
return port == that.port && name.equals(that.name) && host.equals(that.host);
}
+ /**
+ * Creates node UUID.
+ *
+ * @return Node UUID identifier.
+ */
+ public UUID id() {
+ return new UUID(name.hashCode(), name.substring(name.length() / 2).hashCode());
+ }
+
/** {@inheritDoc} */
@Override public int hashCode() {
return Objects.hash(name, host, port);
diff --git a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
index f62904b..5f1173d 100644
--- a/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
+++ b/modules/raft-client/src/main/java/org/apache/ignite/raft/client/service/impl/RaftGroupServiceImpl.java
@@ -25,8 +25,8 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.message.NetworkMessage;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
@@ -58,7 +58,7 @@ import static java.util.concurrent.ThreadLocalRandom.current;
* The implementation of {@link RaftGroupService}
*/
public class RaftGroupServiceImpl implements RaftGroupService {
- /** Logger. */
+ /** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceImpl.class);
/** */
diff --git a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
index cb93874..344dd55 100644
--- a/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
+++ b/modules/raft-client/src/test/java/org/apache/ignite/raft/client/service/RaftGroupServiceTest.java
@@ -23,9 +23,9 @@ import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
-import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
import org.apache.ignite.raft.client.WriteCommand;
@@ -62,7 +62,7 @@ import static org.mockito.Mockito.when;
*/
@ExtendWith(MockitoExtension.class)
public class RaftGroupServiceTest {
- /** */
+ /** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(RaftGroupServiceTest.class);
/** */
diff --git a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
index d24d6f9..941b23c 100644
--- a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
+++ b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ITRaftCounterServerTest.java
@@ -20,8 +20,8 @@ package org.apache.ignite.raft.server;
import java.util.List;
import java.util.Map;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.ClusterServiceFactory;
import org.apache.ignite.network.message.MessageSerializationRegistry;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
@@ -42,7 +42,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** */
class ITRaftCounterServerTest {
- /** */
+ /** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(ITRaftCounterServerTest.class);
/** */
@@ -69,6 +69,9 @@ class ITRaftCounterServerTest {
/** */
private static final String SERVER_ID = "testServer";
+ /** Server id 2. */
+ private static final String SERVER_ID_2 = "testServer2";
+
/** */
private static final String CLIENT_ID = "testClient";
@@ -78,6 +81,9 @@ class ITRaftCounterServerTest {
/** */
private static final String COUNTER_GROUP_ID_1 = "counter1";
+ /** Counter group id 3. */
+ private static final String COUNTER_GROUP_ID_3 = "counter3";
+
/**
* @param testInfo Test info.
*/
@@ -85,8 +91,7 @@ class ITRaftCounterServerTest {
void before(TestInfo testInfo) {
LOG.info(">>>> Starting test " + testInfo.getTestMethod().orElseThrow().getName());
- server = new RaftServerImpl(SERVER_ID,
- 20100,
+ server = new RaftServerImpl(startClient(SERVER_ID, 20100, List.of()),
FACTORY,
1000,
Map.of(COUNTER_GROUP_ID_0, new CounterCommandListener(), COUNTER_GROUP_ID_1, new CounterCommandListener()));
@@ -106,6 +111,45 @@ class ITRaftCounterServerTest {
}
/**
+ * @throws Exception
+ */
+ @Test
+ public void testTwoServer() throws Exception {
+ RaftServer raftServer2 = new RaftServerImpl(startClient(SERVER_ID_2, 20102, List.of("localhost:20100")),
+ FACTORY,
+ 1000,
+ Map.of(COUNTER_GROUP_ID_3, new CounterCommandListener()));
+
+ assertTrue(waitForTopology(client, 3, 10_000), "Nodes: " + client.topologyService().allMembers().size());
+
+ Peer server = new Peer(client.topologyService().allMembers().stream().filter(m -> SERVER_ID.equals(m.name())).findFirst().orElseThrow());
+ Peer server2 = new Peer(client.topologyService().allMembers().stream().filter(m -> SERVER_ID_2.equals(m.name())).findFirst().orElseThrow());
+
+ RaftGroupService service = new RaftGroupServiceImpl(COUNTER_GROUP_ID_0, client, FACTORY, 1000,
+ List.of(server), true, 200);
+
+ RaftGroupService service2 = new RaftGroupServiceImpl(COUNTER_GROUP_ID_3, client, FACTORY, 1000,
+ List.of(server2), true, 200);
+
+ Peer leader = service.leader();
+ Peer leader2 = service2.leader();
+
+ assertEquals(server.getNode().name(), leader.getNode().name());
+ assertEquals(server2.getNode().name(), leader2.getNode().name());
+
+ assertEquals(0, service.<Integer>run(new GetValueCommand()).get());
+ assertEquals(0, service2.<Integer>run(new GetValueCommand()).get());
+
+ assertEquals(2, service.<Integer>run(new IncrementAndGetCommand(2)).get());
+ assertEquals(2, service2.<Integer>run(new IncrementAndGetCommand(2)).get());
+
+ assertEquals(2, service.<Integer>run(new GetValueCommand()).get());
+ assertEquals(2, service2.<Integer>run(new GetValueCommand()).get());
+
+ raftServer2.shutdown();
+ }
+
+ /**
*/
@Test
public void testRefreshLeader() {
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index e91ff2d..3d2ce2c 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -17,16 +17,39 @@
package org.apache.ignite.internal.raft;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
+import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.server.RaftServer;
+import org.apache.ignite.raft.server.impl.RaftServerImpl;
/**
* Best raft manager ever since 1982.
*/
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class Loza {
+public class Loza {
+ /** Factory. */
+ private static RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
+
+ /** Timeout. */
+ private static final int TIMEOUT = 1000;
+
+ /** Retry delay. */
+ private static final int DELAY = 200;
+
/** Cluster network service. */
private final ClusterService clusterNetSvc;
+ /** Raft server. */
+ private RaftServer raftServer;
+
/**
* Constructor.
*
@@ -34,5 +57,33 @@ import org.apache.ignite.network.ClusterService;
*/
public Loza(ClusterService clusterNetSvc) {
this.clusterNetSvc = clusterNetSvc;
+
+ this.raftServer = new RaftServerImpl(clusterNetSvc, FACTORY, 1000, Map.of());
+ }
+
+ /**
+ * Creates a RAFT group.
+ *
+ * @param groupId RAFT group id.
+ * @param peers Group peers.
+ * @param lsnr Group listener.
+ * @return A RAFT group client.
+ */
+ public RaftGroupService startRaftGroup(String groupId, List<ClusterNode> peers, RaftGroupCommandListener lsnr) {
+ assert peers.size() > 1;
+
+ //Now we are using only one node in a raft group.
+ if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name()))
+ raftServer.setListener(groupId, lsnr);
+
+ return new RaftGroupServiceImpl(
+ groupId,
+ clusterNetSvc,
+ FACTORY,
+ TIMEOUT,
+ peers.stream().map(Peer::new).collect(Collectors.toList()),
+ true,
+ DELAY
+ );
}
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
index 943e539..a861314 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/server/impl/RaftServerImpl.java
@@ -27,11 +27,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.BiConsumer;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterNode;
-import org.apache.ignite.network.message.MessageSerializationRegistry;
-import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
+import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.Peer;
import org.apache.ignite.raft.client.RaftErrorCode;
@@ -52,16 +49,13 @@ import org.jetbrains.annotations.NotNull;
* A single node server implementation.
*/
public class RaftServerImpl implements RaftServer {
- /** */
+ /** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(RaftServerImpl.class);
/** */
private final String id;
/** */
- private final int localPort;
-
- /** */
private final RaftClientMessageFactory clientMsgFactory;
/** */
@@ -83,24 +77,20 @@ public class RaftServerImpl implements RaftServer {
private final Thread writeWorker;
/**
- * @param id Server id.
- * @param localPort Local port.
+ * @param service Network service.
* @param clientMsgFactory Client message factory.
* @param queueSize Queue size.
* @param listeners Command listeners.
*/
public RaftServerImpl(
- @NotNull String id,
- int localPort,
+ ClusterService service,
@NotNull RaftClientMessageFactory clientMsgFactory,
int queueSize,
Map<String, RaftGroupCommandListener> listeners
) {
- Objects.requireNonNull(id);
Objects.requireNonNull(clientMsgFactory);
- this.id = id;
- this.localPort = localPort;
+ this.id = service.topologyService().localMember().name();
this.clientMsgFactory = clientMsgFactory;
if (listeners != null)
@@ -109,18 +99,7 @@ public class RaftServerImpl implements RaftServer {
readQueue = new ArrayBlockingQueue<>(queueSize);
writeQueue = new ArrayBlockingQueue<>(queueSize);
- // TODO: IGNITE-14088: Uncomment and use real serializer factory
- var serializationRegistry = new MessageSerializationRegistry();
-// .registerFactory((short)1000, ???)
-// .registerFactory((short)1001, ???)
-// .registerFactory((short)1005, ???)
-// .registerFactory((short)1006, ???)
-// .registerFactory((short)1009, ???);
-
- var context = new ClusterLocalConfiguration(id, localPort, List.of(), serializationRegistry);
- var factory = new ScaleCubeClusterServiceFactory();
-
- server = factory.createClusterService(context);
+ server = service;
server.messagingService().addMessageHandler((message, sender, correlationId) -> {
if (message instanceof GetLeaderRequest) {
@@ -129,9 +108,9 @@ public class RaftServerImpl implements RaftServer {
server.messagingService().send(sender, resp, correlationId);
}
else if (message instanceof ActionRequest) {
- ActionRequest<?> req0 = (ActionRequest<?>) message;
+ ActionRequest<?> req0 = (ActionRequest<?>)message;
- RaftGroupCommandListener lsnr = listeners.get(req0.groupId());
+ RaftGroupCommandListener lsnr = RaftServerImpl.this.listeners.get(req0.groupId());
if (lsnr == null) {
sendError(sender, correlationId, RaftErrorCode.ILLEGAL_STATE);
@@ -161,7 +140,7 @@ public class RaftServerImpl implements RaftServer {
writeWorker.setDaemon(true);
writeWorker.start();
- LOG.info("Started replication server [id=" + id + ", localPort=" + localPort + ']');
+ LOG.info("Started replication server [node=" + server.topologyService().localMember() + ']');
}
/** {@inheritDoc} */
@@ -189,7 +168,7 @@ public class RaftServerImpl implements RaftServer {
writeWorker.interrupt();
writeWorker.join();
- LOG.info("Stopped replication server [id=" + id + ", localPort=" + localPort + ']');
+ LOG.info("Stopped replication server [node=" + server.topologyService().localMember() + ']');
}
private <T extends Command> void handleActionRequest(
diff --git a/modules/rest/pom.xml b/modules/rest/pom.xml
index ca77cc8..ea3d2a7 100644
--- a/modules/rest/pom.xml
+++ b/modules/rest/pom.xml
@@ -112,17 +112,6 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
</plugin>
-
- <!--
- Include javadoc from generated sources
- -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <sourcepath>src/main/java:target/generated-sources/annotations</sourcepath>
- </configuration>
- </plugin>
</plugins>
</build>
</project>
diff --git a/modules/rest/src/main/java/org/apache/ignite/rest/configuration/InMemoryConfigurationStorage.java b/modules/rest/src/main/java/org/apache/ignite/rest/configuration/InMemoryConfigurationStorage.java
deleted file mode 100644
index b4a3e3e..0000000
--- a/modules/rest/src/main/java/org/apache/ignite/rest/configuration/InMemoryConfigurationStorage.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.rest.configuration;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.ignite.configuration.storage.ConfigurationStorage;
-import org.apache.ignite.configuration.storage.ConfigurationStorageListener;
-import org.apache.ignite.configuration.storage.ConfigurationType;
-import org.apache.ignite.configuration.storage.Data;
-import org.apache.ignite.configuration.storage.StorageException;
-
-/**
- * Temporary configuration storage.
- */
-public class InMemoryConfigurationStorage implements ConfigurationStorage {
- /** Map to store values. */
- private Map<String, Serializable> map = new ConcurrentHashMap<>();
-
- /** Change listeners. */
- private List<ConfigurationStorageListener> listeners = new CopyOnWriteArrayList<>();
-
- /** Storage version. */
- private AtomicLong version = new AtomicLong(0);
-
- /** {@inheritDoc} */
- @Override public synchronized Data readAll() throws StorageException {
- return new Data(new HashMap<>(map), version.get(), 0);
- }
-
- /** {@inheritDoc} */
- @Override public synchronized CompletableFuture<Boolean> write(Map<String, Serializable> newValues, long version) {
- if (version != this.version.get())
- return CompletableFuture.completedFuture(false);
-
- for (Map.Entry<String, Serializable> entry : newValues.entrySet()) {
- if (entry.getValue() != null)
- map.put(entry.getKey(), entry.getValue());
- else
- map.remove(entry.getKey());
- }
-
- this.version.incrementAndGet();
-
- listeners.forEach(listener -> listener.onEntriesChanged(new Data(newValues, this.version.get(), 0)));
-
- return CompletableFuture.completedFuture(true);
- }
-
- /** {@inheritDoc} */
- @Override public void addListener(ConfigurationStorageListener listener) {
- listeners.add(listener);
- }
-
- /** {@inheritDoc} */
- @Override public void removeListener(ConfigurationStorageListener listener) {
- listeners.remove(listener);
- }
-
- /** {@inheritDoc} */
- @Override public void notifyApplied(long storageRevision) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public ConfigurationType type() {
- return ConfigurationType.LOCAL;
- }
-}
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index d233d3a..841eba9 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -111,18 +111,5 @@
<filtering>true</filtering>
</resource>
</resources>
-
- <plugins>
- <!--
- Include javadoc from generated sources
- -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <sourcepath>src/main/java:target/generated-sources/annotations</sourcepath>
- </configuration>
- </plugin>
- </plugins>
</build>
</project>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
index 12f4f4a..166e5c5 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/IgnitionTest.java
@@ -33,6 +33,10 @@ class IgnitionTest {
private final String[] nodesBootstrapCfg =
{
"{\n" +
+ " \"node\": {\n" +
+ " \"name\":node0,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
" \"network\": {\n" +
" \"port\":3344,\n" +
" \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" +
@@ -40,6 +44,10 @@ class IgnitionTest {
"}",
"{\n" +
+ " \"node\": {\n" +
+ " \"name\":node1,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
" \"network\": {\n" +
" \"port\":3345,\n" +
" \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" +
@@ -47,6 +55,10 @@ class IgnitionTest {
"}",
"{\n" +
+ " \"node\": {\n" +
+ " \"name\":node2,\n" +
+ " \"metastorageNodes\":[ \"node0\", \"node1\" ]\n" +
+ " },\n" +
" \"network\": {\n" +
" \"port\":3346,\n" +
" \"netClusterNodes\":[ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" +
diff --git a/modules/runner/src/main/java/org/apache/ignite/configuration/extended/BaselineConfigurationSchema.java b/modules/runner/src/main/java/org/apache/ignite/configuration/extended/BaselineConfigurationSchema.java
deleted file mode 100644
index 363305d..0000000
--- a/modules/runner/src/main/java/org/apache/ignite/configuration/extended/BaselineConfigurationSchema.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.configuration.extended;
-
-import org.apache.ignite.configuration.annotation.Config;
-import org.apache.ignite.configuration.annotation.ConfigValue;
-
-/** */
-@SuppressWarnings("PMD.UnusedPrivateField")
-@Config
-public class BaselineConfigurationSchema {
- /** */
- @ConfigValue
- private AutoAdjustConfigurationSchema autoAdjust;
-}
diff --git a/modules/runner/src/main/java/org/apache/ignite/configuration/extended/DataStorageConfigurationSchema.java b/modules/runner/src/main/java/org/apache/ignite/configuration/extended/DataStorageConfigurationSchema.java
deleted file mode 100644
index 2a8b678..0000000
--- a/modules/runner/src/main/java/org/apache/ignite/configuration/extended/DataStorageConfigurationSchema.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.configuration.extended;
-
-import org.apache.ignite.configuration.annotation.Config;
-import org.apache.ignite.configuration.annotation.Value;
-
-/** */
-@Config
-public class DataStorageConfigurationSchema {
- /** */
- @Value
- public int pageSize;
-
- /** */
- @Value
- public String storagePath;
-
- /** */
- @Value
- public String walPath;
-}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c60026c..7ada28b 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -18,24 +18,24 @@
package org.apache.ignite.internal.app;
import org.apache.ignite.app.Ignite;
-import org.apache.ignite.table.manager.TableManager;
+import org.apache.ignite.table.manager.IgniteTables;
/**
* Ignite internal implementation.
*/
public class IgniteImpl implements Ignite {
/** Distributed table manager. */
- private final TableManager distributedTblMgr;
+ private final IgniteTables distributedTblMgr;
/**
* @param TblMgr Table manager.
*/
- IgniteImpl(TableManager TblMgr) {
+ IgniteImpl(IgniteTables TblMgr) {
this.distributedTblMgr = TblMgr;
}
/** {@inheritDoc} */
- @Override public TableManager tableManager() {
+ @Override public IgniteTables tables() {
return distributedTblMgr;
}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index b6b2622..4c88f2f 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.app;
+import io.netty.util.internal.StringUtil;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -24,33 +25,39 @@ import java.util.List;
import java.util.stream.Collectors;
import org.apache.ignite.app.Ignite;
import org.apache.ignite.app.Ignition;
-import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.internal.ConfigurationManager;
-import org.apache.ignite.configuration.storage.ConfigurationType;
import org.apache.ignite.configuration.schemas.network.NetworkConfiguration;
import org.apache.ignite.configuration.schemas.network.NetworkView;
-import org.apache.ignite.internal.affinity.AffinityManager;
+import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
+import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.configuration.storage.ConfigurationStorage;
-import org.apache.ignite.internal.table.distributed.TableManagerImpl;
-import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.configuration.storage.ConfigurationType;
+import org.apache.ignite.internal.affinity.AffinityManager;
+import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.storage.DistributedConfigurationStorage;
+import org.apache.ignite.internal.storage.LocalConfigurationStorage;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterLocalConfiguration;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.message.MessageSerializationRegistry;
import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
-import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.storage.DistributedConfigurationStorage;
-import org.apache.ignite.internal.storage.LocalConfigurationStorage;
-import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.table.manager.TableManager;
+import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.utils.IgniteProperties;
/**
* Implementation of an entry point for handling grid lifecycle.
*/
public class IgnitionImpl implements Ignition {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(IgnitionImpl.class);
+
/** */
private static final String[] BANNER = new String[] {
"",
@@ -71,9 +78,6 @@ public class IgnitionImpl implements Ignition {
/** */
private static final String VER_KEY = "version";
- /** */
- private static final IgniteLogger LOG = IgniteLogger.forClass(IgnitionImpl.class);
-
/** {@inheritDoc} */
@Override public synchronized Ignite start(String jsonStrBootstrapCfg) {
ackBanner();
@@ -83,7 +87,12 @@ public class IgnitionImpl implements Ignition {
boolean cfgBootstrappedFromPds = vaultMgr.bootstrapped();
- List<RootKey<?, ?>> rootKeys = new ArrayList<>(Collections.singletonList(NetworkConfiguration.KEY));
+ List<RootKey<?, ?>> rootKeys = Arrays.asList(
+ NetworkConfiguration.KEY,
+ NodeConfiguration.KEY,
+ ClusterConfiguration.KEY,
+ TablesConfiguration.KEY
+ );
List<ConfigurationStorage> configurationStorages =
new ArrayList<>(Collections.singletonList(new LocalConfigurationStorage(vaultMgr)));
@@ -108,16 +117,23 @@ public class IgnitionImpl implements Ignition {
var serializationRegistry = new MessageSerializationRegistry();
+ String localNodeName = locConfigurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
+ .name().value();
+
+ assert !StringUtil.isNullOrEmpty(localNodeName) : "Node local name is empty";
+
// Network startup.
ClusterService clusterNetSvc = new ScaleCubeClusterServiceFactory().createClusterService(
new ClusterLocalConfiguration(
- "Node" + netConfigurationView.port(),
+ localNodeName,
netConfigurationView.port(),
Arrays.asList(netConfigurationView.netClusterNodes()),
serializationRegistry
)
);
+ clusterNetSvc.start();
+
// Raft Component startup.
Loza raftMgr = new Loza(clusterNetSvc);
@@ -138,17 +154,17 @@ public class IgnitionImpl implements Ignition {
BaselineManager baselineMgr = new BaselineManager(configurationMgr, metaStorageMgr, clusterNetSvc);
// Affinity manager startup.
- new AffinityManager(configurationMgr, metaStorageMgr, baselineMgr);
+ new AffinityManager(configurationMgr, metaStorageMgr, baselineMgr, vaultMgr);
SchemaManager schemaMgr = new SchemaManager(configurationMgr);
// Distributed table manager startup.
- TableManager distributedTblMgr = new TableManagerImpl(
+ IgniteTables distributedTblMgr = new TableManager(
configurationMgr,
- clusterNetSvc,
metaStorageMgr,
schemaMgr,
- raftMgr
+ raftMgr,
+ vaultMgr
);
// TODO IGNITE-14579 Start rest manager.
@@ -156,8 +172,6 @@ public class IgnitionImpl implements Ignition {
// Deploy all resisted watches cause all components are ready and have registered their listeners.
metaStorageMgr.deployWatches();
- clusterNetSvc.start();
-
ackSuccessStart();
return new IgniteImpl(distributedTblMgr);
diff --git a/modules/runner/src/main/java/org/apache/ignite/utils/IgniteProperties.java b/modules/runner/src/main/java/org/apache/ignite/utils/IgniteProperties.java
index 4f6766c..725ceec 100644
--- a/modules/runner/src/main/java/org/apache/ignite/utils/IgniteProperties.java
+++ b/modules/runner/src/main/java/org/apache/ignite/utils/IgniteProperties.java
@@ -62,6 +62,7 @@ public class IgniteProperties {
/**
* Returns property value for a given key or {@code null} if nothing was found.
*
+ * @param key Key.
* @return Value or {@code null}.
*/
public static String get(String key) {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 86672c7..92f4e10 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -21,6 +21,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
import java.nio.charset.StandardCharsets;
/**
@@ -67,7 +69,11 @@ public class ByteBufferRow implements BinaryRow {
/** {@inheritDoc} */
@Override public void writeTo(OutputStream stream) throws IOException {
- throw new UnsupportedOperationException("Not implemented yet.");
+ WritableByteChannel channel = Channels.newChannel(stream);
+
+ channel.write(buf);
+
+ buf.rewind();
}
/** {@inheritDoc} */
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index f5831c4..da099e4 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.schema;
+import java.util.UUID;
import org.apache.ignite.configuration.internal.ConfigurationManager;
/**
@@ -27,6 +28,9 @@ import org.apache.ignite.configuration.internal.ConfigurationManager;
/** Configuration manager in order to handle and listen schema specific configuration.*/
private final ConfigurationManager configurationMgr;
+ /** Schema. */
+ private final SchemaDescriptor schema;
+
/**
* The constructor.
*
@@ -34,5 +38,39 @@ import org.apache.ignite.configuration.internal.ConfigurationManager;
*/
public SchemaManager(ConfigurationManager configurationMgr) {
this.configurationMgr = configurationMgr;
+
+ this.schema = new SchemaDescriptor(1,
+ new Column[] {
+ new Column("key", NativeType.LONG, false)
+ },
+ new Column[] {
+ new Column("value", NativeType.LONG, false)
+ }
+ );
+ }
+
+ /**
+ * Gets a current schema for the table specified.
+ *
+ * @param tableId Table id.
+ * @return Schema.
+ */
+ public SchemaDescriptor schema(UUID tableId) {
+ return schema;
+ }
+
+ /**
+ * Gets a schema for specific version.
+ *
+ * @param tableId Table id.
+ * @param ver Schema version.
+ * @return Schema.
+ */
+ public SchemaDescriptor schema(UUID tableId, long ver) {
+ assert ver >= 0;
+
+ assert schema.version() == ver;
+
+ return schema;
}
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
index 34998ab..fbdb013 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
@@ -58,15 +58,15 @@ import org.apache.ignite.lang.IgniteLogger;
*/
@Experimental
public class AsmSerializerGenerator implements SerializerFactory {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(AsmSerializerGenerator.class);
+
/** Serializer package name. */
public static final String SERIALIZER_PACKAGE_NAME = "org.apache.ignite.internal.schema.marshaller";
/** Serializer package name prefix. */
public static final String SERIALIZER_CLASS_NAME_PREFIX = "SerializerForSchema_";
- /** Logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(AsmSerializerGenerator.class);
-
/** Dump generated code. */
private final boolean dumpCode = LOG.isTraceEnabled();
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index dae8a7a..bf93ea1 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -68,8 +68,24 @@
<artifactId>ignite-raft</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-raft-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-metastorage-client</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-affinity</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
index 1242f6e..cceb908 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -29,14 +29,14 @@ abstract class AbstractTableView {
protected final InternalTable tbl;
/** Schema manager. */
- protected final TableSchemaManager schemaMgr;
+ protected final TableSchemaView schemaMgr;
/**
* Constructor
* @param tbl Internal table.
* @param schemaMgr Schema manager.
*/
- protected AbstractTableView(InternalTable tbl, TableSchemaManager schemaMgr) {
+ protected AbstractTableView(InternalTable tbl, TableSchemaView schemaMgr) {
this.tbl = tbl;
this.schemaMgr = schemaMgr;
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
index f27642a..c113dc7 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
@@ -49,7 +49,7 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
* @param tbl Table storage.
* @param schemaMgr Schema manager.
*/
- public KVBinaryViewImpl(InternalTable tbl, TableSchemaManager schemaMgr) {
+ public KVBinaryViewImpl(InternalTable tbl, TableSchemaView schemaMgr) {
super(tbl, schemaMgr);
marsh = new TupleMarshallerImpl(schemaMgr);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
index b0642fd..dfb6538 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
@@ -44,7 +44,7 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
* @param keyMapper Key class mapper.
* @param valueMapper Value class mapper.
*/
- public KVViewImpl(InternalTable tbl, TableSchemaManager schemaMgr, KeyMapper<K> keyMapper,
+ public KVViewImpl(InternalTable tbl, TableSchemaView schemaMgr, KeyMapper<K> keyMapper,
ValueMapper<V> valueMapper) {
super(tbl, schemaMgr);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 686c328..0d9ac93 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -42,7 +42,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R
* @param schemaMgr Schema manager.
* @param mapper Record class mapper.
*/
- public RecordViewImpl(InternalTable tbl, TableSchemaManager schemaMgr, RecordMapper<R> mapper) {
+ public RecordViewImpl(InternalTable tbl, TableSchemaView schemaMgr, RecordMapper<R> mapper) {
super(tbl, schemaMgr);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 445859a..cc70462 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -51,7 +51,7 @@ public class TableImpl extends AbstractTableView implements Table {
* @param tbl Table.
* @param schemaMgr Table schema manager.
*/
- public TableImpl(InternalTable tbl, TableSchemaManager schemaMgr) {
+ public TableImpl(InternalTable tbl, TableSchemaView schemaMgr) {
super(tbl, schemaMgr);
marsh = new TupleMarshallerImpl(schemaMgr);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
similarity index 96%
rename from modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaManager.java
rename to modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
index 09961ab..d63d71c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaView.java
@@ -22,7 +22,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
/**
* Table schema manager interface.
*/
-public interface TableSchemaManager {
+public interface TableSchemaView {
/**
* @return Current schema.
*/
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
index 5b8a217..9729a79 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
@@ -31,14 +31,14 @@ import org.jetbrains.annotations.NotNull;
*/
class TupleMarshallerImpl implements TupleMarshaller {
/** Schema manager. */
- private final TableSchemaManager schemaMgr;
+ private final TableSchemaView schemaMgr;
/**
* Constructor.
*
* @param schemaMgr Schema manager.
*/
- TupleMarshallerImpl(TableSchemaManager schemaMgr) {
+ TupleMarshallerImpl(TableSchemaView schemaMgr) {
this.schemaMgr = schemaMgr;
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
new file mode 100644
index 0000000..9f745d4
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
+import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.raft.Loza;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.metastorage.common.Conditions;
+import org.apache.ignite.metastorage.common.Key;
+import org.apache.ignite.metastorage.common.Operations;
+import org.apache.ignite.metastorage.common.WatchEvent;
+import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.manager.IgniteTables;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Table manager.
+ */
+public class TableManager implements IgniteTables {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class);
+
+ /** Internal prefix for the metasorage. */
+ private static final String INTERNAL_PREFIX = "internal.tables.";
+
+ /** Meta storage service. */
+ private final MetaStorageManager metaStorageMgr;
+
+ /** Configuration manager. */
+ private final ConfigurationManager configurationMgr;
+
+ /** Table creation subscription future. */
+ private CompletableFuture<Long> tableCreationSubscriptionFut;
+
+ /** Tables. */
+ private Map<String, Table> tables;
+
+ /**
+ * @param configurationMgr Configuration manager.
+ * @param metaStorageMgr Meta storage manager.
+ * @param schemaManager Schema manager.
+ * @param raftMgr Raft manager.
+ */
+ public TableManager(
+ ConfigurationManager configurationMgr,
+ MetaStorageManager metaStorageMgr,
+ SchemaManager schemaManager,
+ Loza raftMgr,
+ VaultManager vaultManager
+ ) {
+ tables = new HashMap<>();
+
+ this.configurationMgr = configurationMgr;
+ this.metaStorageMgr = metaStorageMgr;
+
+ String localNodeName = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
+ .name().value();
+
+ configurationMgr.configurationRegistry().getConfiguration(ClusterConfiguration.KEY)
+ .metastorageNodes().listen(ctx -> {
+ if (ctx.newValue() != null) {
+ if (hasMetastorageLocally(localNodeName, ctx.newValue()))
+ subscribeForTableCreation();
+ else
+ unsubscribeForTableCreation();
+ }
+ return CompletableFuture.completedFuture(null);
+
+ });
+
+ String[] metastorageMembers = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
+ .metastorageNodes().value();
+
+ if (hasMetastorageLocally(localNodeName, metastorageMembers))
+ subscribeForTableCreation();
+
+ String tableInternalPrefix = INTERNAL_PREFIX + "assignment.#";
+
+ tableCreationSubscriptionFut = metaStorageMgr.registerWatch(new Key(tableInternalPrefix), new WatchListener() {
+ @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+ for (WatchEvent evt : events) {
+ if (!ArrayUtils.empty(evt.newEntry().value())) {
+ String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
+
+ String placeholderValue = keyTail.substring(0, keyTail.indexOf('.'));
+
+ UUID tblId = UUID.fromString(placeholderValue);
+
+ try {
+ String name = new String(vaultManager.get((INTERNAL_PREFIX + tblId.toString())
+ .getBytes(StandardCharsets.UTF_8)).get().value(), StandardCharsets.UTF_8);
+
+ int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+ .tables().get(name).partitions().value();
+
+ List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)IgniteUtils.fromBytes(
+ evt.newEntry().value());
+
+ HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
+
+ for (int p = 0; p < partitions; p++) {
+ partitionMap.put(p, raftMgr.startRaftGroup(
+ name + "_part_" + p,
+ assignment.get(p),
+ new PartitionCommandListener()
+ ));
+ }
+
+ tables.put(name, new TableImpl(
+ new InternalTableImpl(
+ tblId,
+ partitionMap,
+ partitions
+ ),
+ new TableSchemaView() {
+ @Override public SchemaDescriptor schema() {
+ return schemaManager.schema(tblId);
+ }
+
+ @Override public SchemaDescriptor schema(int ver) {
+ return schemaManager.schema(tblId, ver);
+ }
+ }));
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Failed to start table [key={}]",
+ evt.newEntry().key(), e);
+ }
+ }
+ }
+
+ return true;
+ }
+
+ @Override public void onError(@NotNull Throwable e) {
+ LOG.error("Metastorage listener issue", e);
+ }
+ });
+ }
+
+ /**
+ * Checks whether the local node hosts Metastorage.
+ *
+ * @param localNodeName Local node uniq name.
+ * @param metastorageMembers Metastorage members names.
+ * @return True if the node has Metastorage, false otherwise.
+ */
+ private boolean hasMetastorageLocally(String localNodeName, String[] metastorageMembers) {
+ boolean isLocalNodeHasMetasorage = false;
+
+ for (String name : metastorageMembers) {
+ if (name.equals(localNodeName)) {
+ isLocalNodeHasMetasorage = true;
+
+ break;
+ }
+ }
+ return isLocalNodeHasMetasorage;
+ }
+
+ /**
+ * Subscribes on table create.
+ */
+ private void subscribeForTableCreation() {
+ //TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
+ configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+ .tables().listen(ctx -> {
+ HashSet<String> tblNamesToStart = new HashSet<>(ctx.newValue().namedListKeys());
+
+ long revision = ctx.storageRevision();
+
+ if (ctx.oldValue() != null)
+ tblNamesToStart.removeAll(ctx.oldValue().namedListKeys());
+
+ for (String tblName : tblNamesToStart) {
+ TableView tableView = ctx.newValue().get(tblName);
+ long update = 0;
+
+ UUID tblId = new UUID(revision, update);
+
+ CompletableFuture<Boolean> fut = metaStorageMgr.invoke(
+ new Key(INTERNAL_PREFIX + tblId.toString()),
+ Conditions.value().eq(null),
+ Operations.put(tableView.name().getBytes(StandardCharsets.UTF_8)),
+ Operations.noop());
+
+ try {
+ if (fut.get()) {
+ metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0]);
+
+ LOG.info("Table manager created a table [name={}, revision={}]",
+ tableView.name(), revision);
+ }
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Table was not fully initialized [name={}, revision={}]",
+ tableView.name(), revision, e);
+ }
+ }
+ return CompletableFuture.completedFuture(null);
+ });
+ }
+
+ /**
+ * Unsubscribe from table creation.
+ */
+ private void unsubscribeForTableCreation() {
+ if (tableCreationSubscriptionFut == null)
+ return;
+
+ try {
+ Long subscriptionId = tableCreationSubscriptionFut.get();
+
+ metaStorageMgr.unregisterWatch(subscriptionId);
+
+ tableCreationSubscriptionFut = null;
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Couldn't unsubscribe from table creation", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
+ configurationMgr.configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
+ change.create(name, tableInitChange));
+
+// this.createTable("tbl1", change -> {
+// change.initReplicas(2);
+// change.initName("tbl1");
+// change.initPartitions(1_000);
+// });
+
+ //TODO: IGNITE-14646 Support asynchronous table creation
+ Table tbl = null;
+
+ while (tbl == null) {
+ try {
+ Thread.sleep(50);
+
+ tbl = table(name);
+ }
+ catch (InterruptedException e) {
+ LOG.error("Waiting of creation of table was interrupted.", e);
+ }
+ }
+
+ return tbl;
+ }
+
+ /** {@inheritDoc} */
+ @Override public List<Table> tables() {
+ return new ArrayList<>(tables.values());
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table table(String name) {
+ return tables.get(name);
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManagerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManagerImpl.java
deleted file mode 100644
index 5932ea1..0000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManagerImpl.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed;
-
-import org.apache.ignite.configuration.internal.ConfigurationManager;
-import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.network.ClusterService;
-import org.apache.ignite.internal.schema.SchemaManager;
-import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.table.manager.TableManager;
-
-/**
- * Table Manager that handles inner table lifecycle and provide corresponding API methods.
- */
-// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
-@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class TableManagerImpl implements TableManager {
- /** Meta storage service. */
- private final MetaStorageManager metaStorageMgr;
-
- /** Network cluster. */
- private final ClusterService clusterNetSvc;
-
- /** Schema manager. */
- private final SchemaManager schemaMgr;
-
- /** Configuration manager. */
- private final ConfigurationManager configurationMgr;
-
- /** Raft manager. */
- private final Loza raftMgr;
-
- /**
- * The constructor.
- *
- * @param configurationMgr Configuration table.
- * @param clusterNetSvc Cluster network service.
- * @param metaStorageMgr MetaStorage manager.
- * @param schemaMgr Schema manager.
- * @param raftMgr Raft manager.
- */
- public TableManagerImpl(
- ConfigurationManager configurationMgr,
- ClusterService clusterNetSvc,
- MetaStorageManager metaStorageMgr,
- SchemaManager schemaMgr,
- Loza raftMgr
- ) {
- this.configurationMgr = configurationMgr;
- this.clusterNetSvc = clusterNetSvc;
- this.metaStorageMgr = metaStorageMgr;
- this.schemaMgr = schemaMgr;
- this.raftMgr = raftMgr;
- }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
new file mode 100644
index 0000000..eabe187
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command deletes a entry by passed key.
+ */
+public class DeleteCommand implements WriteCommand {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(DeleteCommand.class);
+
+ /** Key row. */
+ private transient BinaryRow keyRow;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after.
+ */
+ private byte[] keyRowBytes;
+
+ /**
+ * @param keyRow Key row.
+ */
+ public DeleteCommand(@NotNull BinaryRow keyRow) {
+ assert keyRow != null;
+
+ this.keyRow = keyRow;
+
+ rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
+ }
+
+ /**
+ * Writes a row to byte array.
+ *
+ * @param row Row.
+ * @param consumer Byte array consumer.
+ */
+ private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ row.writeTo(baos);
+
+ baos.flush();
+
+ consumer.accept(baos.toByteArray());
+ }
+ catch (IOException e) {
+ LOG.error("Could not write row to stream [row=" + row + ']', e);
+
+ consumer.accept(null);
+ }
+ }
+
+ /**
+ * Gets a key row.
+ *
+ * @return Key row.
+ */
+ public BinaryRow getKeyRow() {
+ if (keyRow == null)
+ keyRow = new ByteBufferRow(keyRowBytes);
+
+ return keyRow;
+ }
+
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java
new file mode 100644
index 0000000..597aba3
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command gets a value by key specified.
+ */
+public class GetCommand implements ReadCommand {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
+
+ /** Key row. */
+ private transient BinaryRow keyRow;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after.
+ */
+ private byte[] keyRowBytes;
+
+ /**
+ * @param keyRow Key row.
+ */
+ public GetCommand(@NotNull BinaryRow keyRow) {
+ assert keyRow != null;
+
+ this.keyRow = keyRow;
+
+ rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
+ }
+
+ /**
+ * Writes a row to byte array.
+ *
+ * @param row Row.
+ * @param consumer Byte array consumer.
+ */
+ private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ row.writeTo(baos);
+
+ baos.flush();
+
+ consumer.accept(baos.toByteArray());
+ }
+ catch (IOException e) {
+ LOG.error("Could not write row to stream [row=" + row + ']', e);
+
+ consumer.accept(null);
+ }
+ }
+
+ /**
+ * Gets a key row.
+ *
+ * @return Key row.
+ */
+ public BinaryRow getKeyRow() {
+ if (keyRow == null)
+ keyRow = new ByteBufferRow(keyRowBytes);
+
+ return keyRow;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
new file mode 100644
index 0000000..be6ddcc
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command inserts a row.
+ */
+public class InsertCommand implements WriteCommand {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
+
+ /** Row. */
+ private transient BinaryRow row;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after.
+ */
+ private byte[] rowBytes;
+
+ /**
+ * @param row Row.
+ */
+ public InsertCommand(@NotNull BinaryRow row) {
+ assert row != null;
+
+ this.row = row;
+
+ rowToBytes(row, bytes -> rowBytes = bytes);
+ }
+
+ /**
+ * Writes a row to byte array.
+ *
+ * @param row Row.
+ * @param consumer Byte array consumer.
+ */
+ private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ row.writeTo(baos);
+
+ baos.flush();
+
+ consumer.accept(baos.toByteArray());
+ }
+ catch (IOException e) {
+ LOG.error("Could not write row to stream [row=" + row + ']', e);
+
+ consumer.accept(null);
+ }
+ }
+
+ /**
+ * Gets a data row.
+ *
+ * @return Data row.
+ */
+ public BinaryRow getRow() {
+ if (row == null)
+ row = new ByteBufferRow(rowBytes);
+
+ return row;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
new file mode 100644
index 0000000..03c83a2
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command replaces an old entry to a new one.
+ */
+public class ReplaceCommand implements WriteCommand {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(ReplaceCommand.class);
+
+ /** Row. */
+ private transient BinaryRow row;
+
+ /** Old row. */
+ private transient BinaryRow oldRow;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after.
+ */
+ private byte[] rowBytes;
+
+ /**
+ * Old row bytes.
+ * TODO: Remove the field after.
+ */
+ private byte[] oldRowBytes;
+
+ /**
+ * @param oldRow Old row.
+ * @param row Row.
+ */
+ public ReplaceCommand(@NotNull BinaryRow oldRow, @NotNull BinaryRow row) {
+ assert oldRow != null;
+ assert row != null;
+
+ this.oldRow = oldRow;
+ this.row = row;
+
+ rowToBytes(oldRow, bytes -> oldRowBytes = bytes);
+ rowToBytes(row, bytes -> rowBytes = bytes);
+ }
+
+ /**
+ * Writes a row to byte array.
+ *
+ * @param row Row.
+ * @param consumer Byte array consumer.
+ */
+ private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ row.writeTo(baos);
+
+ baos.flush();
+
+ consumer.accept(baos.toByteArray());
+ }
+ catch (IOException e) {
+ LOG.error("Could not write row to stream [row=" + row + ']', e);
+
+ consumer.accept(null);
+ }
+ }
+
+ /**
+ * Gets a data row.
+ *
+ * @return Data row.
+ */
+ public BinaryRow getRow() {
+ if (row == null)
+ row = new ByteBufferRow(rowBytes);
+
+ return row;
+ }
+
+ /**
+ * Gets an old row.
+ *
+ * @return Data row.
+ */
+ public BinaryRow getOldRow() {
+ if (oldRow == null)
+ oldRow = new ByteBufferRow(oldRowBytes);
+
+ return oldRow;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java
new file mode 100644
index 0000000..cc46aca
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command inserts or updates a value for the key specified.
+ */
+public class UpsertCommand implements WriteCommand {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(UpsertCommand.class);
+
+ /** Row. */
+ private transient BinaryRow row;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after.
+ */
+ private byte[] rowBytes;
+
+ /**
+ * @param row Row.
+ */
+ public UpsertCommand(@NotNull BinaryRow row) {
+ assert row != null;
+
+ this.row = row;
+
+ rowToBytes(row, bytes -> rowBytes = bytes);
+ }
+
+ /**
+ * Writes a row to byte array.
+ *
+ * @param row Row.
+ * @param consumer Byte array consumer.
+ */
+ private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ row.writeTo(baos);
+
+ baos.flush();
+
+ consumer.accept(baos.toByteArray());
+ }
+ catch (IOException e) {
+ LOG.error("Could not write row to stream [row=" + row + ']', e);
+
+ consumer.accept(null);
+ }
+ }
+
+ /**
+ * Gets a data row.
+ *
+ * @return Data row.
+ */
+ public BinaryRow getRow() {
+ if (row == null)
+ row = new ByteBufferRow(rowBytes);
+
+ return row;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java
new file mode 100644
index 0000000..803cee1
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.response;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * It is a response object for handling a table get command.
+ */
+public class KVGetResponse implements Serializable {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
+
+ /** Row. */
+ private transient BinaryRow row;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after.
+ */
+ private byte[] rowBytes;
+
+ public KVGetResponse(BinaryRow row) {
+ this.row = row;
+
+ rowToBytes(row, bytes -> rowBytes = bytes);
+ }
+
+ /**
+ * Writes a row to byte array.
+ *
+ * @param row Row.
+ * @param consumer Byte array consumer.
+ */
+ private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
+ if (row == null) {
+ consumer.accept(null);
+
+ return;
+ }
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ row.writeTo(baos);
+
+ baos.flush();
+
+ consumer.accept(baos.toByteArray());
+ }
+ catch (IOException e) {
+ LOG.error("Could not write row to stream [row=" + row + ']', e);
+
+ consumer.accept(null);
+ }
+ }
+
+ /**
+ * @return Data row.
+ */
+ public BinaryRow getValue() {
+ if (row == null && rowBytes != null)
+ row = new ByteBufferRow(rowBytes);
+
+ return row;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
new file mode 100644
index 0000000..b7d9f75
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListener.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupCommandListener;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Partition command handler.
+ */
+public class PartitionCommandListener implements RaftGroupCommandListener {
+ /** Storage. */
+ private ConcurrentHashMap<KeyWrapper, BinaryRow> storage = new ConcurrentHashMap<>();
+
+ /** {@inheritDoc} */
+ @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<ReadCommand> clo = iterator.next();
+
+ assert clo.command() instanceof GetCommand;
+
+ clo.success(new KVGetResponse(storage.get(extractAndWrapKey(((GetCommand)clo.command()).getKeyRow()))));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+ while (iterator.hasNext()) {
+ CommandClosure<WriteCommand> clo = iterator.next();
+
+ if (clo.command() instanceof InsertCommand) {
+ BinaryRow previous = storage.putIfAbsent(
+ extractAndWrapKey(((InsertCommand)clo.command()).getRow()),
+ ((InsertCommand)clo.command()).getRow()
+ );
+
+ clo.success(previous == null);
+ }
+ else if (clo.command() instanceof DeleteCommand) {
+ BinaryRow deleted = storage.remove(
+ extractAndWrapKey(((DeleteCommand)clo.command()).getKeyRow())
+ );
+
+ clo.success(deleted != null);
+ }
+ else if (clo.command() instanceof ReplaceCommand) {
+ ReplaceCommand cmd = ((ReplaceCommand)clo.command());
+
+ BinaryRow expected = cmd.getOldRow();
+
+ KeyWrapper key = extractAndWrapKey(expected);
+
+ BinaryRow current = storage.get(key);
+
+ if ((current == null && !expected.hasValue()) ||
+ equalValues(current, expected)) {
+ storage.put(key, cmd.getRow());
+
+ clo.success(true);
+ }
+ else
+ clo.success(false);
+ }
+ else if (clo.command() instanceof UpsertCommand) {
+ storage.put(
+ extractAndWrapKey(((UpsertCommand)clo.command()).getRow()),
+ ((UpsertCommand)clo.command()).getRow()
+ );
+
+ clo.success(null);
+ }
+ else
+ assert false : "Command was not found [cmd=" + clo.command() + ']';
+ }
+ }
+
+ /**
+ * @param row Row.
+ * @return Extracted key.
+ */
+ @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
+ if (row.hasValue() ^ row2.hasValue())
+ return false;
+
+ return row.valueSlice().compareTo(row2.valueSlice()) == 0;
+ }
+
+ /**
+ * @param row Row.
+ * @return Extracted key.
+ */
+ @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
+ final byte[] bytes = new byte[row.keySlice().capacity()];
+ row.keySlice().get(bytes);
+
+ return new KeyWrapper(bytes, row.hash());
+ }
+
+ /**
+ * Wrapper provides correct byte[] comparison.
+ */
+ private static class KeyWrapper {
+ /** Data. */
+ private final byte[] data;
+
+ /** Hash. */
+ private final int hash;
+
+ /**
+ * Constructor.
+ *
+ * @param data Wrapped data.
+ */
+ KeyWrapper(byte[] data, int hash) {
+ assert data != null;
+
+ this.data = data;
+ this.hash = hash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ KeyWrapper wrapper = (KeyWrapper)o;
+ return Arrays.equals(data, wrapper.data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return hash;
+ }
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
new file mode 100644
index 0000000..2ff5e8b
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.storage;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Storage of table rows.
+ */
+public class InternalTableImpl implements InternalTable {
+ /** Partition map. */
+ private Map<Integer, RaftGroupService> partitionMap;
+
+ /** Partitions. */
+ private int partitions;
+
+ /**
+ * @param tableId Table id.
+ * @param partMap Map partition id to raft group.
+ * @param partitions Partitions.
+ */
+ public InternalTableImpl(
+ UUID tableId,
+ Map<Integer, RaftGroupService> partMap,
+ int partitions
+ ) {
+ this.partitionMap = partMap;
+ this.partitions = partitions;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
+ return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
+ .thenApply(response -> response.getValue());
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> upsert(BinaryRow row) {
+ return partitionMap.get(row.hash() % partitions).<Void>run(new UpsertCommand(row))
+ .thenApply(response -> response);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<BinaryRow> getAndUpsert(BinaryRow row) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> insert(BinaryRow row) {
+ return partitionMap.get(row.hash() % partitions).<Boolean>run(new InsertCommand(row))
+ .thenApply(response -> response);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> replace(BinaryRow row) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> replace(BinaryRow oldRow, BinaryRow newRow) {
+ return partitionMap.get(oldRow.hash() % partitions).<Boolean>run(new ReplaceCommand(oldRow, newRow))
+ .thenApply(response -> response);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<BinaryRow> getAndReplace(BinaryRow row) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> delete(BinaryRow keyRow) {
+ return partitionMap.get(keyRow.hash() % partitions).<Boolean>run(new DeleteCommand(keyRow))
+ .thenApply(response -> response);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> deleteExact(BinaryRow oldRow) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<BinaryRow> getAndDelete(BinaryRow row) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows) {
+ return null;
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/distributed/DistributedTableTest.java b/modules/table/src/test/java/org/apache/ignite/table/distributed/DistributedTableTest.java
new file mode 100644
index 0000000..f287a08
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/table/distributed/DistributedTableTest.java
@@ -0,0 +1,349 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.distributed;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.affinity.RendezvousAffinityFunction;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.network.ClusterLocalConfiguration;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterService;
+import org.apache.ignite.network.ClusterServiceFactory;
+import org.apache.ignite.network.message.MessageSerializationRegistry;
+import org.apache.ignite.network.scalecube.ScaleCubeClusterServiceFactory;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.message.RaftClientMessageFactory;
+import org.apache.ignite.raft.client.message.impl.RaftClientMessageFactoryImpl;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
+import org.apache.ignite.raft.server.RaftServer;
+import org.apache.ignite.raft.server.impl.RaftServerImpl;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Distributed internal table tests.
+ */
+public class DistributedTableTest {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(DistributedTableTest.class);
+
+ /** Base network port. */
+ public static final int NODE_PORT_BASE = 20_000;
+
+ /** Nodes. */
+ public static final int NODES = 5;
+
+ /** Partitions. */
+ public static final int PARTS = 10;
+
+ /** Factory. */
+ private static RaftClientMessageFactory FACTORY = new RaftClientMessageFactoryImpl();
+
+ /** Network factory. */
+ private static final ClusterServiceFactory NETWORK_FACTORY = new ScaleCubeClusterServiceFactory();
+
+ /** */
+ // TODO: IGNITE-14088: Uncomment and use real serializer provider
+ private static final MessageSerializationRegistry SERIALIZATION_REGISTRY = new MessageSerializationRegistry();
+
+ /** Client. */
+ private ClusterService client;
+
+ /** Schema. */
+ public static SchemaDescriptor SCHEMA = new SchemaDescriptor(1, new Column[] {
+ new Column("key", NativeType.LONG, false)
+ }, new Column[] {
+ new Column("value", NativeType.LONG, false)
+ });
+
+ /** Cluster. */
+ private ArrayList<ClusterService> cluster = new ArrayList<>();
+
+ @BeforeEach
+ public void beforeTest() {
+ for (int i = 0; i < NODES; i++) {
+ cluster.add(startClient(
+ "node_" + i,
+ NODE_PORT_BASE + i,
+ IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES).boxed().map((port) -> "localhost:" + port).collect(Collectors.toList())
+ ));
+ }
+
+ for (ClusterService node : cluster)
+ assertTrue(waitForTopology(node, NODES, 1000));
+
+ LOG.info("Cluster started.");
+
+ client = startClient(
+ "client",
+ NODE_PORT_BASE + NODES,
+ IntStream.range(NODE_PORT_BASE, NODE_PORT_BASE + NODES).boxed().map((port) -> "localhost:" + port).collect(Collectors.toList())
+ );
+
+ assertTrue(waitForTopology(client, NODES + 1, 1000));
+
+ LOG.info("Client started.");
+ }
+
+ @AfterEach
+ public void afterTest() throws Exception {
+ for (ClusterService node : cluster) {
+ node.shutdown();
+ }
+
+ client.shutdown();
+ }
+
+ @Test
+ public void partitionListener() throws Exception {
+ String grpId = "part";
+
+ RaftServer partSrv = new RaftServerImpl(
+ cluster.get(0),
+ FACTORY,
+ 1000,
+ Map.of(grpId, new PartitionCommandListener())
+ );
+
+ RaftGroupService partRaftGrp = new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000,
+ List.of(new Peer(cluster.get(0).topologyService().localMember())), true, 200);
+
+ Row testRow = getTestRow();
+
+ CompletableFuture<Boolean> insertFur = partRaftGrp.run(new InsertCommand(testRow));
+
+ assertTrue(insertFur.get());
+
+// Row keyChunk = new Row(SCHEMA, new ByteBufferRow(testRow.keySlice()));
+ Row keyChunk = getTestKey();
+
+ CompletableFuture<KVGetResponse> getFur = partRaftGrp.run(new GetCommand(keyChunk));
+
+ assertNotNull(getFur.get().getValue());
+
+ assertEquals(testRow.longValue(1), new Row(SCHEMA, getFur.get().getValue()).longValue(1));
+
+ partSrv.shutdown();
+ }
+
+ @NotNull private Row getTestKey() {
+ RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+
+ rowBuilder.appendLong(1L);
+
+ return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
+ }
+
+ @NotNull private Row getTestRow() {
+ RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+
+ rowBuilder.appendLong(1L);
+ rowBuilder.appendLong(10L);
+
+ return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
+ }
+
+ @Test
+ public void partitionedTable() {
+ HashMap<ClusterNode, RaftServer> raftServers = new HashMap<>(NODES);
+
+ for (int i = 0; i < NODES; i++) {
+ raftServers.put(cluster.get(i).topologyService().localMember(), new RaftServerImpl(
+ cluster.get(i),
+ FACTORY,
+ 1000,
+ Map.of()
+ ));
+ }
+
+ List<List<ClusterNode>> assignment = RendezvousAffinityFunction.assignPartitions(
+ cluster.stream().map(node -> node.topologyService().localMember()).collect(Collectors.toList()),
+ PARTS,
+ 1,
+ false,
+ null
+ );
+
+ int p = 0;
+
+ Map<Integer, RaftGroupService> partMap = new HashMap<>();
+
+ for (List<ClusterNode> partNodes : assignment) {
+ RaftServer rs = raftServers.get(partNodes.get(0));
+
+ String grpId = "part-" + p;
+
+ rs.setListener(grpId, new PartitionCommandListener());
+
+ partMap.put(p, new RaftGroupServiceImpl(grpId, client, FACTORY, 10_000,
+ List.of(new Peer(partNodes.get(0))), true, 200));
+
+ p++;
+ }
+
+ Table tbl = new TableImpl(new InternalTableImpl(
+ UUID.randomUUID(),
+ partMap,
+ PARTS
+ ), new TableSchemaView() {
+ @Override public SchemaDescriptor schema() {
+ return SCHEMA;
+ }
+
+ @Override public SchemaDescriptor schema(int ver) {
+ return SCHEMA;
+ }
+ });
+
+ for (int i = 0; i < PARTS * 10; i++) {
+ tbl.kvView().putIfAbsent(
+ tbl.kvView().tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build(),
+ tbl.kvView().tupleBuilder()
+ .set("value", Long.valueOf(i + 2))
+ .build());
+ }
+
+ for (int i = 0; i < PARTS * 10; i++) {
+ Tuple entry = tbl.kvView().get(
+ tbl.kvView().tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ LOG.info("The result is [key=" + i + ", tuple=" + entry + ']');
+
+ assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+ }
+
+ for (int i = 0; i < PARTS * 10; i++) {
+ tbl.kvView().put(
+ tbl.kvView().tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build(),
+ tbl.kvView().tupleBuilder()
+ .set("value", Long.valueOf(i + 5))
+ .build());
+
+ Tuple entry = tbl.kvView().get(
+ tbl.kvView().tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertEquals(Long.valueOf(i + 5), entry.longValue("value"));
+ }
+
+ for (int i = 0; i < PARTS * 10; i++) {
+ boolean res = tbl.kvView().replace(
+ tbl.kvView().tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build(),
+ tbl.kvView().tupleBuilder()
+ .set("value", Long.valueOf(i + 5))
+ .build(),
+ tbl.kvView().tupleBuilder()
+ .set("value", Long.valueOf(i + 2))
+ .build());
+
+ assertTrue(res);
+ }
+
+ for (int i = 0; i < PARTS * 10; i++) {
+ boolean res = tbl.kvView().remove(
+ tbl.kvView().tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertTrue(res);
+
+ Tuple entry = tbl.kvView().get(
+ tbl.kvView().tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertNull(entry);
+ }
+ }
+
+ /**
+ * @param name Node name.
+ * @param port Local port.
+ * @param servers Server nodes of the cluster.
+ * @return The client cluster view.
+ */
+ private ClusterService startClient(String name, int port, List<String> servers) {
+ var context = new ClusterLocalConfiguration(name, port, servers, SERIALIZATION_REGISTRY);
+ var network = NETWORK_FACTORY.createClusterService(context);
+ network.start();
+ return network;
+ }
+
+ /**
+ * @param cluster The cluster.
+ * @param expected Expected count.
+ * @param timeout The timeout in millis.
+ * @return {@code True} if topology size is equal to expected.
+ */
+ private boolean waitForTopology(ClusterService cluster, int expected, int timeout) {
+ long stop = System.currentTimeMillis() + timeout;
+
+ while (System.currentTimeMillis() < stop) {
+ if (cluster.topologyService().allMembers().size() >= expected)
+ return true;
+
+ try {
+ Thread.sleep(50);
+ }
+ catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ return false;
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java b/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
index 01df2fe..255c56d 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
@@ -18,13 +18,13 @@
package org.apache.ignite.table.impl;
import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.table.TableSchemaManager;
+import org.apache.ignite.internal.table.TableSchemaView;
import org.jetbrains.annotations.NotNull;
/**
* Dummy schema manager for tests.
*/
-public class DummySchemaManagerImpl implements TableSchemaManager {
+public class DummySchemaManagerImpl implements TableSchemaView {
/** Schema. */
private final SchemaDescriptor schema;
diff --git a/modules/vault/pom.xml b/modules/vault/pom.xml
index 04c49da..5f640f1 100644
--- a/modules/vault/pom.xml
+++ b/modules/vault/pom.xml
@@ -31,4 +31,11 @@
<artifactId>ignite-vault</artifactId>
<version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.jetbrains</groupId>
+ <artifactId>annotations</artifactId>
+ </dependency>
+ </dependencies>
</project>
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
index f3d7f48..cfc3fd0 100644
--- a/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/VaultManager.java
@@ -17,6 +17,9 @@
package org.apache.ignite.internal.vault;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.vault.common.VaultEntry;
+
/**
* VaultManager is responsible for handling VaultService lifecycle and providing interface for managing local keys.
*/
@@ -31,4 +34,11 @@ public class VaultManager {
}
// TODO: IGNITE-14405 Local persistent key-value storage (Vault).
+
+ /**
+ * This is a proxy to Vault service method.
+ */
+ public CompletableFuture<VaultEntry> get(byte[] key) {
+ return null;
+ }
}
diff --git a/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java
new file mode 100644
index 0000000..d35e245
--- /dev/null
+++ b/modules/vault/src/main/java/org/apache/ignite/internal/vault/common/VaultEntry.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.vault.common;
+
+import java.io.Serializable;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Representation of vault entry.
+ */
+public class VaultEntry implements Serializable {
+ /** Key. */
+ private byte[] key;
+
+ /** Value. */
+ private byte[] val;
+
+ /**
+ * Constructs {@code VaultEntry} instance from the given key and value.
+ *
+ * @param key Key as a {@code ByteArray}.
+ * @param val Value as a {@code byte[]}.
+ */
+ public VaultEntry(byte[] key, byte[] val) {
+ this.key = key;
+ this.val = val;
+ }
+
+ /**
+ * Gets a key bytes.
+ *
+ * @return Byte array.
+ */
+ public @NotNull byte[] key() {
+ return key;
+ }
+
+ /**
+ * Gets a value bytes.
+ *
+ * @return Byte array.
+ */
+ public @NotNull byte[] value() {
+ return val;
+ }
+
+ /**
+ * Returns value which denotes whether entry is empty or not.
+ *
+ * @return {@code True} if entry is empty, otherwise - {@code false}.
+ */
+ public boolean empty() {
+ return val == null;
+ }
+}
diff --git a/parent/pom.xml b/parent/pom.xml
index 7cb414a..11b804a 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -797,6 +797,7 @@
<version>false</version>
<additionalJOptions>${javadoc.opts}</additionalJOptions>
<excludePackageNames>com.facebook.presto.*</excludePackageNames>
+ <sourcepath>src/main/java:target/generated-sources/annotations</sourcepath>
<links>
<link>https://ignite.apache.org/releases/latest/javadoc/</link>
</links>