You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/10/08 16:21:59 UTC
[ignite-3] branch ignite-3.0.0-alpha3 updated: IGNITE-15491 Added
setBaseline method and naive rebalance. Fixes #379
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch ignite-3.0.0-alpha3
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0.0-alpha3 by this push:
new 1147ff3 IGNITE-15491 Added setBaseline method and naive rebalance. Fixes #379
1147ff3 is described below
commit 1147ff33e7f1daa941819548eb87e55d766b9e60
Author: Kirill Gusakov <kg...@gmail.com>
AuthorDate: Fri Oct 8 19:20:18 2021 +0300
IGNITE-15491 Added setBaseline method and naive rebalance. Fixes #379
Signed-off-by: Slava Koptilin <sl...@gmail.com>
(cherry picked from commit dd45a40733022ef342e45d0646e27f1b4c518588)
---
.../src/main/java/org/apache/ignite/Ignite.java | 21 ++
.../ignite/internal/client/TcpIgniteClient.java | 8 +
.../org/apache/ignite/client/fakes/FakeIgnite.java | 8 +
.../java/org/apache/ignite/internal/raft/Loza.java | 82 +++++++-
.../raft/jraft/rpc/impl/RaftGroupServiceImpl.java | 42 +++-
.../internal/runner/app/ITBaselineChangesTest.java | 168 ++++++++++++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 6 +
.../apache/ignite/internal/table/TableImpl.java | 12 ++
.../internal/table/distributed/TableManager.java | 212 ++++++++++++++++++++-
.../distributed/storage/InternalTableImpl.java | 10 +
10 files changed, 560 insertions(+), 9 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/Ignite.java b/modules/api/src/main/java/org/apache/ignite/Ignite.java
index 7d3cbbd..1eda994 100644
--- a/modules/api/src/main/java/org/apache/ignite/Ignite.java
+++ b/modules/api/src/main/java/org/apache/ignite/Ignite.java
@@ -17,8 +17,11 @@
package org.apache.ignite;
+import java.util.Set;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.table.manager.IgniteTables;
import org.apache.ignite.tx.IgniteTransactions;
+import org.jetbrains.annotations.ApiStatus.Experimental;
/**
* Ignite node interface. Main entry-point for all Ignite APIs.
@@ -44,4 +47,22 @@ public interface Ignite extends AutoCloseable {
* @return Ignite transactions.
*/
IgniteTransactions transactions();
+
+ /**
+ * Set new baseline nodes for table assignments.
+ *
+ * Current implementation has significant restrictions:
+ * - Only alive nodes can be a part of new baseline.
+ * If any passed nodes are not alive, {@link IgniteException} with appropriate message will be thrown.
+ * - Potentially it can be a long operation and current
+ * synchronous changePeers-based implementation can't handle this issue well.
+ * - No recovery logic supported, if setBaseline fails - it can produce random state of cluster.
+ *
+ * TODO: IGNITE-14209 issues above must be fixed.
+ *
+ * @param baselineNodes Names of baseline nodes.
+ * @throws IgniteException if nodes empty/null or any node is not alive.
+ */
+ @Experimental
+ void setBaseline(Set<String> baselineNodes);
}
diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
index 0ffb12b..ed9bf1d 100644
--- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
+++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpIgniteClient.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.client;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
@@ -101,6 +102,13 @@ public class TcpIgniteClient implements IgniteClient {
return null;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void setBaseline(Set<String> baselineNodes) {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override public void close() throws Exception {
ch.close();
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
index 45a94f6..e46ac6d 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgnite.java
@@ -17,6 +17,7 @@
package org.apache.ignite.client.fakes;
+import java.util.Set;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.processors.query.calcite.QueryProcessor;
import org.apache.ignite.table.manager.IgniteTables;
@@ -50,6 +51,13 @@ public class FakeIgnite implements Ignite {
return null;
}
+ /**
+ * {@inheritDoc}
+ */
+ @Override public void setBaseline(Set<String> baselineNodes) {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override public void close() {
// No-op.
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index df2ea1d..04427e5 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.raft;
import java.nio.file.Path;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
@@ -30,6 +31,7 @@ import org.apache.ignite.internal.raft.server.RaftServer;
import org.apache.ignite.internal.raft.server.impl.JRaftServerImpl;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Peer;
@@ -38,11 +40,15 @@ import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.jraft.util.Utils;
+import org.jetbrains.annotations.ApiStatus.Experimental;
/**
* Best raft manager ever since 1982.
*/
public class Loza implements IgniteComponent {
+ /** Ignite logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(IgniteLogger.class);
+
/** Factory. */
private static final RaftMessagesFactory FACTORY = new RaftMessagesFactory();
@@ -55,7 +61,12 @@ public class Loza implements IgniteComponent {
private static final int CLIENT_POOL_SIZE = Math.min(Utils.cpus() * 3, 20);
/** Timeout. */
- private static final int TIMEOUT = 1000;
+ // TODO: IGNITE-15705 Correct value should be investigated
+ private static final int TIMEOUT = 10000;
+
+ /** Network timeout. */
+ // TODO: IGNITE-15705 Correct value should be investigated
+ private static final int NETWORK_TIMEOUT = 3000;
/** Retry delay. */
private static final int DELAY = 200;
@@ -103,15 +114,23 @@ public class Loza implements IgniteComponent {
* Creates a raft group service providing operations on a raft group.
* If {@code nodes} contains the current node, then raft group starts on the current node.
*
+ * IMPORTANT: DON'T USE. This method should be used only for long running changePeers requests - until
+ * IGNITE-14209 will be fixed with stable solution.
+ *
* @param groupId Raft group id.
* @param nodes Raft group nodes.
* @param lsnrSupplier Raft group listener supplier.
+ * @param clientTimeout Client retry timeout.
+ * @param networkTimeout Client network timeout.
* @return Future representing pending completion of the operation.
*/
+ @Experimental
public CompletableFuture<RaftGroupService> prepareRaftGroup(
String groupId,
List<ClusterNode> nodes,
- Supplier<RaftGroupListener> lsnrSupplier) {
+ Supplier<RaftGroupListener> lsnrSupplier,
+ int clientTimeout,
+ int networkTimeout) {
assert !nodes.isEmpty();
List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
@@ -125,7 +144,8 @@ public class Loza implements IgniteComponent {
groupId,
clusterNetSvc,
FACTORY,
- TIMEOUT,
+ clientTimeout,
+ networkTimeout,
peers,
true,
DELAY,
@@ -134,6 +154,23 @@ public class Loza implements IgniteComponent {
}
/**
+ * Creates a raft group service providing operations on a raft group.
+ * If {@code nodes} contains the current node, then raft group starts on the current node.
+ *
+ * @param groupId Raft group id.
+ * @param nodes Raft group nodes.
+ * @param lsnrSupplier Raft group listener supplier.
+ * @return Future representing pending completion of the operation.
+ */
+ public CompletableFuture<RaftGroupService> prepareRaftGroup(
+ String groupId,
+ List<ClusterNode> nodes,
+ Supplier<RaftGroupListener> lsnrSupplier) {
+
+ return prepareRaftGroup(groupId, nodes, lsnrSupplier, TIMEOUT, NETWORK_TIMEOUT);
+ }
+
+ /**
* Stops a raft group on the current node if {@code nodes} contains the current node.
*
* @param groupId Raft group id.
@@ -147,4 +184,43 @@ public class Loza implements IgniteComponent {
if (nodes.stream().anyMatch(n -> locNodeName.equals(n.name())))
raftServer.stopRaftGroup(groupId);
}
+
+ /**
+ * Creates a raft group service providing operations on a raft group.
+ * If {@code deltaNodes} contains the current node, then raft group starts on the current node.
+ * @param groupId Raft group id.
+ * @param nodes Full set of raft group nodes.
+ * @param deltaNodes New raft group nodes.
+ * @param lsnrSupplier Raft group listener supplier.
+ * @return Future representing pending completion of the operation.
+ * @return
+ */
+ public CompletableFuture<RaftGroupService> updateRaftGroup(
+ String groupId,
+ Collection<ClusterNode> nodes,
+ Collection<ClusterNode> deltaNodes,
+ Supplier<RaftGroupListener> lsnrSupplier
+ ) {
+ assert !nodes.isEmpty();
+
+ List<Peer> peers = nodes.stream().map(n -> new Peer(n.address())).collect(Collectors.toList());
+
+ String locNodeName = clusterNetSvc.topologyService().localMember().name();
+
+ if (deltaNodes.stream().anyMatch(n -> locNodeName.equals(n.name()))) {
+ if (!raftServer.startRaftGroup(groupId, lsnrSupplier.get(), peers))
+ LOG.error("Failed to start raft group on node " + locNodeName);
+ }
+
+ return RaftGroupServiceImpl.start(
+ groupId,
+ clusterNetSvc,
+ FACTORY,
+ TIMEOUT,
+ peers,
+ true,
+ DELAY,
+ executor
+ );
+ }
}
diff --git a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
index 081f02d..4b847a0 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/rpc/impl/RaftGroupServiceImpl.java
@@ -73,6 +73,9 @@ public class RaftGroupServiceImpl implements RaftGroupService {
/** */
private volatile long timeout;
+ /** Timeout for network calls. */
+ private final long networkTimeout;
+
/** */
private final String groupId;
@@ -114,6 +117,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
ClusterService cluster,
RaftMessagesFactory factory,
int timeout,
+ int networkTimeout,
List<Peer> peers,
Peer leader,
long retryDelay,
@@ -124,6 +128,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
this.learners = Collections.emptyList();
this.factory = factory;
this.timeout = timeout;
+ this.networkTimeout = networkTimeout;
this.groupId = groupId;
this.retryDelay = retryDelay;
this.leader = leader;
@@ -137,6 +142,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
* @param cluster Cluster service.
* @param factory Message factory.
* @param timeout Timeout.
+ * @param netTimeout Network call timeout.
* @param peers List of all peers.
* @param getLeader {@code True} to get the group's leader upon service creation.
* @param retryDelay Retry delay.
@@ -148,12 +154,13 @@ public class RaftGroupServiceImpl implements RaftGroupService {
ClusterService cluster,
RaftMessagesFactory factory,
int timeout,
+ int netTimeout,
List<Peer> peers,
boolean getLeader,
long retryDelay,
ScheduledExecutorService executor
) {
- var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, peers, null, retryDelay, executor);
+ var service = new RaftGroupServiceImpl(groupId, cluster, factory, timeout, netTimeout, peers, null, retryDelay, executor);
if (!getLeader)
return CompletableFuture.completedFuture(service);
@@ -166,6 +173,32 @@ public class RaftGroupServiceImpl implements RaftGroupService {
});
}
+ /**
+ * Starts raft group service.
+ *
+ * @param groupId Raft group id.
+ * @param cluster Cluster service.
+ * @param factory Message factory.
+ * @param timeout Timeout.
+ * @param peers List of all peers.
+ * @param getLeader {@code True} to get the group's leader upon service creation.
+ * @param retryDelay Retry delay.
+ * @param executor Executor for retrying requests.
+ * @return Future representing pending completion of the operation.
+ */
+ public static CompletableFuture<RaftGroupService> start(
+ String groupId,
+ ClusterService cluster,
+ RaftMessagesFactory factory,
+ int timeout,
+ List<Peer> peers,
+ boolean getLeader,
+ long retryDelay,
+ ScheduledExecutorService executor
+ ) {
+ return start(groupId, cluster, factory, timeout, timeout, peers, getLeader, retryDelay, executor);
+ }
+
/** {@inheritDoc} */
@Override public @NotNull String groupId() {
return groupId;
@@ -456,7 +489,7 @@ public class RaftGroupServiceImpl implements RaftGroupService {
return;
}
- CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, timeout);
+ CompletableFuture<?> fut0 = cluster.messagingService().invoke(peer.address(), (NetworkMessage) req, networkTimeout);
fut0.whenComplete(new BiConsumer<Object, Throwable>() {
@Override public void accept(Object resp, Throwable err) {
@@ -487,7 +520,10 @@ public class RaftGroupServiceImpl implements RaftGroupService {
return null;
}, retryDelay, TimeUnit.MILLISECONDS);
}
- else if (resp0.errorCode() == RaftError.EPERM.getNumber()) {
+ else if (resp0.errorCode() == RaftError.EPERM.getNumber() ||
+ // TODO: IGNITE-15706
+ resp0.errorCode() == RaftError.UNKNOWN.getNumber() ||
+ resp0.errorCode() == RaftError.EINTERNAL.getNumber()) {
if (resp0.leaderId() == null) {
executor.schedule(() -> {
sendWithRetry(randomNode(), req, stopTime, fut);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITBaselineChangesTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITBaselineChangesTest.java
new file mode 100644
index 0000000..c4fd2f3
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ITBaselineChangesTest.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.runner.app;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import com.google.common.collect.Lists;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgnitionManager;
+import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.schema.SchemaBuilders;
+import org.apache.ignite.schema.definition.ColumnType;
+import org.apache.ignite.schema.definition.TableDefinition;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test for baseline changes
+ */
+@ExtendWith(WorkDirectoryExtension.class)
+public class ITBaselineChangesTest {
+ /** Start network port for test nodes. */
+ private static final int BASE_PORT = 3344;
+
+ /** Nodes bootstrap configuration. */
+ private final Map<String, String> initClusterNodes = new LinkedHashMap<>();
+
+ /** */
+ private final List<Ignite> clusterNodes = new ArrayList<>();
+
+ /** */
+ @WorkDirectory
+ private Path workDir;
+
+ /** */
+ @BeforeEach
+ void setUp(TestInfo testInfo) {
+ String node0Name = testNodeName(testInfo, BASE_PORT);
+ String node1Name = testNodeName(testInfo, BASE_PORT + 1);
+ String node2Name = testNodeName(testInfo, BASE_PORT + 2);
+
+ initClusterNodes.put(
+ node0Name,
+ buildConfig(node0Name, 0)
+ );
+
+ initClusterNodes.put(
+ node1Name,
+ buildConfig(node0Name, 1)
+ );
+
+ initClusterNodes.put(
+ node2Name,
+ buildConfig(node0Name, 2)
+ );
+ }
+
+ /** */
+ @AfterEach
+ void tearDown() throws Exception {
+ IgniteUtils.closeAll(Lists.reverse(clusterNodes));
+ }
+
+ /**
+ * Check dynamic table creation.
+ */
+ @Test
+ void testBaselineExtending(TestInfo testInfo) {
+ initClusterNodes.forEach((nodeName, configStr) ->
+ clusterNodes.add(IgnitionManager.start(nodeName, configStr, workDir.resolve(nodeName)))
+ );
+
+ assertEquals(3, clusterNodes.size());
+
+ // Create table on node 0.
+ TableDefinition schTbl1 = SchemaBuilders.tableBuilder("PUBLIC", "tbl1").columns(
+ SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+ SchemaBuilders.column("val", ColumnType.INT32).asNullable().build()
+ ).withPrimaryKey("key").build();
+
+ clusterNodes.get(0).tables().createTable(schTbl1.canonicalName(), tblCh ->
+ SchemaConfigurationConverter.convert(schTbl1, tblCh)
+ .changeReplicas(5)
+ .changePartitions(1)
+ );
+
+ // Put data on node 1.
+ Table tbl1 = clusterNodes.get(1).tables().table(schTbl1.canonicalName());
+ RecordView<Tuple> recView1 = tbl1.recordView();
+
+ recView1.insert(Tuple.create().set("key", 1L).set("val", 111));
+
+ var metaStoreNode = clusterNodes.get(0);
+
+ var node3Name = testNodeName(testInfo, nodePort(3));
+ var node4Name = testNodeName(testInfo, nodePort(4));
+
+ // Start 2 new nodes after
+ var node3 = IgnitionManager.start(
+ node3Name, buildConfig(metaStoreNode.name(), 3), workDir.resolve(node3Name));
+
+ clusterNodes.add(node3);
+
+ var node4 = IgnitionManager.start(
+ node4Name, buildConfig(metaStoreNode.name(), 4), workDir.resolve(node4Name));
+
+ clusterNodes.add(node4);
+
+ // Update baseline to nodes 1,4,5
+ metaStoreNode.setBaseline(Set.of(metaStoreNode.name(), node3Name, node4Name));
+
+ IgnitionManager.stop(clusterNodes.get(1).name());
+ IgnitionManager.stop(clusterNodes.get(2).name());
+
+ Table tbl4 = node4.tables().table(schTbl1.canonicalName());
+
+ final Tuple keyTuple1 = Tuple.create().set("key", 1L);
+
+ assertEquals(1, (Long) tbl4.recordView().get(keyTuple1).value("key"));
+ }
+
+ /** */
+ private String buildConfig(String metastoreNodeName, int nodeIdx) {
+ return "{\n" +
+ " node.metastorageNodes: [ \"" + metastoreNodeName + "\" ],\n" +
+ " network: {\n" +
+ " port: " + nodePort(nodeIdx) + "\n" +
+ " netClusterNodes: [ \"localhost:3344\", \"localhost:3345\", \"localhost:3346\" ]\n" +
+ " }\n" +
+ "}";
+ }
+
+ /** */
+ private int nodePort(int nodeIdx) {
+ return BASE_PORT + nodeIdx;
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index f68ab1e..113e9cd 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
@@ -343,6 +344,11 @@ public class IgniteImpl implements Ignite {
return name;
}
+ /** {@inheritDoc} */
+ @Override public void setBaseline(Set<String> baselineNodes) {
+ distributedTblMgr.setBaseline(baselineNodes);
+ }
+
/**
* @return Node configuration.
*/
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 5247e34..22f9b9c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.table;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.schema.definition.SchemaManagementMode;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
@@ -110,4 +112,14 @@ public class TableImpl implements Table {
public void schemaMode(SchemaManagementMode schemaMode) {
this.tbl.schema(schemaMode);
}
+
+ /**
+ * Updates internal table raft group service for given partition.
+ *
+ * @param p Partition.
+ * @param raftGrpSvc Raft group service.
+ */
+ public void updateInternalTableRaftGroupService(int p, RaftGroupService raftGrpSvc) {
+ ((InternalTableImpl)tbl).updateInternalTableRaftGroupService(p, raftGrpSvc);
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 73d4bdc..a23c426 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,6 +34,9 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
@@ -81,6 +85,11 @@ import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.client.Peer;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.manager.IgniteTables;
@@ -142,7 +151,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
//TODO: IGNITE-15161 These should go into TableImpl instances.
/** Instances of table storages that need to be stopped on component stop. */
- private final Set<TableStorage> tableStorages = ConcurrentHashMap.newKeySet();
+ private final Map<IgniteUuid, TableStorage> tableStorages = new ConcurrentHashMap<>();
/**
* Creates a new table manager.
@@ -232,6 +241,51 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
});
+ ((ExtendedTableConfiguration)tablesCfg.tables().get(ctx.newValue().name())).assignments().
+ listen(assignmentsCtx -> {
+ List<List<ClusterNode>> oldAssignments =
+ (List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.oldValue());
+
+ List<List<ClusterNode>> newAssignments =
+ (List<List<ClusterNode>>)ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+ CompletableFuture<?>[] futures = new CompletableFuture<?>[oldAssignments.size()];
+
+ // TODO: IGNITE-15554 Add logic for assignment recalculation in case of partitions or replicas changes
+ // TODO: Until IGNITE-15554 is implemented it's safe to iterate over partitions and replicas cause there will
+ // TODO: be exact same amount of partitions and replicas for both old and new assignments
+ for (int i = 0; i < oldAssignments.size(); i++) {
+ final int p = i;
+
+ List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
+ List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
+
+ var toAdd = new HashSet<>(newPartitionAssignment);
+ var toRemove = new HashSet<>(oldPartitionAssignment);
+
+ toAdd.removeAll(oldPartitionAssignment);
+ toRemove.removeAll(newPartitionAssignment);
+
+ // Create new raft nodes according to new assignments.
+ futures[i] = raftMgr.updateRaftGroup(
+ raftGroupName(tblId, p),
+ newPartitionAssignment,
+ toAdd,
+ () -> new PartitionListener(tableStorages.get(tblId).getOrCreatePartition(p))
+ )
+ .thenAccept(
+ updatedRaftGroupService -> tables.get(ctx.newValue().name()).updateInternalTableRaftGroupService(p, updatedRaftGroupService)
+ ).thenRun(() -> raftMgr.stopRaftGroup(raftGroupName(tblId, p), new ArrayList<>(toRemove))
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft groups one the node", th);
+ return null;
+ }
+ );
+ }
+
+ return CompletableFuture.allOf(futures);
+ });
+
createTableLocally(
ctx.newValue().name(),
IgniteUuid.fromString(((ExtendedTableView)ctx.newValue()).id()),
@@ -275,7 +329,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** {@inheritDoc} */
@Override public void stop() {
- for (TableStorage tableStorage : tableStorages) {
+ for (TableStorage tableStorage : tableStorages.values()) {
try {
tableStorage.stop();
}
@@ -334,7 +388,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
tableStorage.start();
- tableStorages.add(tableStorage);
+ tableStorages.put(tblId, tableStorage);
for (int p = 0; p < partitions; p++) {
int partId = p;
@@ -871,4 +925,156 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
private boolean isTableConfigured(String name) {
return tableNamesConfigured().contains(name);
}
+
+ /**
+ * @see Ignite#setBaseline(Set)
+ */
+ public void setBaseline(Set<String> nodes) {
+ if (nodes == null || nodes.isEmpty())
+ throw new IgniteException("New baseline can't be null or empty");
+
+ var currClusterMembers = new HashSet<>(baselineMgr.nodes());
+
+ var currClusterMemberNames =
+ currClusterMembers.stream().map(ClusterNode::name).collect(Collectors.toSet());
+
+ for (String nodeName: nodes) {
+ if (!currClusterMemberNames.contains(nodeName))
+ throw new IgniteException("Node '" + nodeName + "' not in current network cluster membership. " +
+ " Adding not alive nodes is not supported yet.");
+ }
+
+ var newBaseline = currClusterMembers
+ .stream().filter(n -> nodes.contains(n.name())).collect(Collectors.toSet());
+
+ updateAssignments(currClusterMembers);
+
+ if (!newBaseline.equals(currClusterMembers))
+ updateAssignments(newBaseline);
+ }
+
+ /**
+ * Update assignments for all current tables according to input nodes list.
+ * These approach has known issues {@link Ignite#setBaseline(Set)}.
+ *
+ * @param clusterNodes Set of nodes for assignment.
+ */
+ private void updateAssignments(Set<ClusterNode> clusterNodes) {
+ var setBaselineFut = new CompletableFuture<>();
+
+ var changePeersQueue = new ArrayList<Supplier<CompletableFuture<Void>>>();
+
+ tablesCfg.tables().change(
+ tbls -> {
+ changePeersQueue.clear();
+ for (int i = 0; i < tbls.size(); i++) {
+ tbls.createOrUpdate(tbls.get(i).name(), changeX -> {
+ ExtendedTableChange change = (ExtendedTableChange)changeX;
+ byte[] currAssignments = change.assignments();
+
+ List<List<ClusterNode>> recalculatedAssignments = AffinityUtils.calculateAssignments(
+ clusterNodes,
+ change.partitions(),
+ change.replicas());
+
+ if (!recalculatedAssignments.equals(ByteUtils.fromBytes(currAssignments))) {
+ change.changeAssignments(ByteUtils.toBytes(recalculatedAssignments));
+ changePeersQueue.add(() ->
+ updateRaftTopology(
+ (List<List<ClusterNode>>)ByteUtils.fromBytes(currAssignments),
+ recalculatedAssignments,
+ IgniteUuid.fromString(change.id())));
+ }
+ });
+ }
+ }).thenCompose((v) -> {
+ CompletableFuture<?>[] changePeersFutures = new CompletableFuture<?>[changePeersQueue.size()];
+
+ int i = 0;
+ for (Supplier<CompletableFuture<Void>> task: changePeersQueue) {
+ changePeersFutures[i++] = task.get();
+ }
+
+ return CompletableFuture.allOf(changePeersFutures);
+ }).whenComplete((res, th) -> {
+ if (th != null)
+ setBaselineFut.completeExceptionally(th);
+ else
+ setBaselineFut.complete(null);
+ });
+
+ setBaselineFut.join();
+ }
+
+ /**
+ * Update raft groups of table partitions to new peers list.
+ *
+ * @param oldAssignments Old assignment.
+ * @param newAssignments New assignment.
+ * @param tblId Table ID.
+ * @return Future, which completes, when update finished.
+ */
+ private CompletableFuture<Void> updateRaftTopology(
+ List<List<ClusterNode>> oldAssignments,
+ List<List<ClusterNode>> newAssignments,
+ IgniteUuid tblId) {
+ CompletableFuture<?>[] futures = new CompletableFuture<?>[oldAssignments.size()];
+
+ // TODO: IGNITE-15554 Add logic for assignment recalculation in case of partitions or replicas changes
+ // TODO: Until IGNITE-15554 is implemented it's safe to iterate over partitions and replicas cause there will
+ // TODO: be exact same amount of partitions and replicas for both old and new assignments
+ for (int i = 0; i < oldAssignments.size(); i++) {
+ final int p = i;
+
+ List<ClusterNode> oldPartitionAssignment = oldAssignments.get(p);
+ List<ClusterNode> newPartitionAssignment = newAssignments.get(p);
+
+ var toAdd = new HashSet<>(newPartitionAssignment);
+
+ toAdd.removeAll(oldPartitionAssignment);
+
+ futures[i] = raftMgr.prepareRaftGroup(
+ raftGroupName(tblId, p),
+ oldPartitionAssignment,
+ () -> new RaftGroupListener() {
+ @Override public void onRead(Iterator<CommandClosure<ReadCommand>> iterator) {
+
+ }
+
+ @Override public void onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
+
+ }
+
+ @Override public void onSnapshotSave(Path path, Consumer<Throwable> doneClo) {
+
+ }
+
+ @Override public boolean onSnapshotLoad(Path path) {
+ return false;
+ }
+
+ @Override public void onShutdown() {
+
+ }
+ },
+ 60000,
+ 10000
+ )
+ .thenCompose(
+ updatedRaftGroupService -> {
+ return
+ updatedRaftGroupService.
+ changePeers(
+ newPartitionAssignment.stream().map(n -> new Peer(n.address())).collect(Collectors.toList()));
+ }
+ ).exceptionally(th -> {
+ LOG.error("Failed to update raft peers for group " + raftGroupName(tblId, p) +
+ "from " + oldPartitionAssignment + " to " + newPartitionAssignment, th);
+ return null;
+ }
+ );
+ }
+
+ return CompletableFuture.allOf(futures);
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index eab54c3..3bb5c15 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -374,6 +374,16 @@ public class InternalTableImpl implements InternalTable {
});
}
+ /**
+ * Updates internal table raft group service for given partition.
+ *
+ * @param p Partition.
+ * @param raftGrpSvc Raft group service.
+ */
+ public void updateInternalTableRaftGroupService(int p, RaftGroupService raftGrpSvc) {
+ partitionMap.put(p, raftGrpSvc);
+ }
+
/** Partition scan publisher. */
private class PartitionScanPublisher implements Publisher<BinaryRow> {
/** {@link Publisher<BinaryRow>} that relatively notifies about partition rows. */