You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/05/13 14:50:26 UTC
[ignite-3] branch main updated: IGNITE-14237 Implemented affinity
calculation based on events. Fixes #120
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 865a066 IGNITE-14237 Implemented affinity calculation based on events. Fixes #120
865a066 is described below
commit 865a066a7f431ef5455533f642a42862d40308b3
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Thu May 13 17:49:50 2021 +0300
IGNITE-14237 Implemented affinity calculation based on events. Fixes #120
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../ignite/internal/affinity/AffinityManager.java | 170 ++++++-------
.../internal/affinity/event/AffinityEvent.java | 31 +++
.../affinity/event/AffinityEventParameters.java | 64 +++++
.../ignite/internal/baseline/BaselineManager.java | 1 +
.../internal/metastorage/MetaStorageManager.java | 40 ++-
.../apache/ignite/internal/app/IgnitionImpl.java | 3 +-
modules/table/pom.xml | 11 +-
.../internal/table/distributed/TableManager.java | 279 ++++++++++-----------
8 files changed, 350 insertions(+), 249 deletions(-)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 4619b11..6572a43 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -18,14 +18,16 @@
package org.apache.ignite.internal.affinity;
import java.nio.charset.StandardCharsets;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import org.apache.ignite.configuration.internal.ConfigurationManager;
-import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.affinity.event.AffinityEvent;
+import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
@@ -36,120 +38,73 @@ import org.apache.ignite.metastorage.common.Key;
import org.apache.ignite.metastorage.common.Operations;
import org.apache.ignite.metastorage.common.WatchEvent;
import org.apache.ignite.metastorage.common.WatchListener;
+import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.NotNull;
/**
* Affinity manager is responsible for affinity function related logic including calculating affinity assignments.
*/
-public class AffinityManager {
+public class AffinityManager extends Producer<AffinityEvent, AffinityEventParameters> {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(AffinityManager.class);
- /** Tables prefix for the metasorage. */
- private static final String INTERNAL_PREFIX = "internal.tables.";
+ /** Internal prefix for the metasorage. */
+ private static final String INTERNAL_PREFIX = "internal.tables.assignment.";
/**
- * MetaStorage manager in order to watch private distributed affinity specific configuration,
- * cause ConfigurationManger handles only public configuration.
+ * MetaStorage manager in order to watch private distributed affinity specific configuration, cause
+ * ConfigurationManger handles only public configuration.
*/
private final MetaStorageManager metaStorageMgr;
- /** Configuration manager in order to handle and listen affinity specific configuration.*/
+ /** Configuration manager in order to handle and listen affinity specific configuration. */
private final ConfigurationManager configurationMgr;
/** Baseline manager. */
private final BaselineManager baselineMgr;
/** Vault manager. */
- private final VaultManager vaultManager;
-
- /** Affinity calculate subscription future. */
- private CompletableFuture<Long> affinityCalculateSubscriptionFut = null;
+ private final VaultManager vaultMgr;
/**
+ * Creates a new affinity manager.
+ *
* @param configurationMgr Configuration module.
* @param metaStorageMgr Meta storage service.
+ * @param baselineMgr Baseline manager.
+ * @param vaultMgr Vault manager.
*/
public AffinityManager(
ConfigurationManager configurationMgr,
MetaStorageManager metaStorageMgr,
BaselineManager baselineMgr,
- VaultManager vaultManager
+ VaultManager vaultMgr
) {
this.configurationMgr = configurationMgr;
this.metaStorageMgr = metaStorageMgr;
this.baselineMgr = baselineMgr;
- this.vaultManager = vaultManager;
-
- String localNodeName = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
- .name().value();
-
- configurationMgr.configurationRegistry().getConfiguration(ClusterConfiguration.KEY)
- .metastorageNodes().listen(ctx -> {
- if (ctx.newValue() != null) {
- if (MetaStorageManager.hasMetastorageLocally(localNodeName, ctx.newValue()))
- subscribeToAssignmentCalculation();
- else
- unsubscribeFromAssignmentCalculation();
- }
- return CompletableFuture.completedFuture(null);
- });
+ this.vaultMgr = vaultMgr;
- String[] metastorageMembers = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
- .metastorageNodes().value();
+ metaStorageMgr.registerWatchByPrefix(new Key(INTERNAL_PREFIX), new WatchListener() {
+ @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
+ for (WatchEvent evt : events) {
+ String tabIdVal = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
- if (MetaStorageManager.hasMetastorageLocally(localNodeName, metastorageMembers))
- subscribeToAssignmentCalculation();
- }
+ UUID tblId = UUID.fromString(tabIdVal);
- /**
- * Subscribes to meta storage members update.
- */
- private void subscribeToAssignmentCalculation() {
- assert affinityCalculateSubscriptionFut == null : "Affinity calculation already subscribed";
+ if (evt.newEntry().value() == null) {
+ assert evt.oldEntry().value() != null : "Previous assignment is unknown";
- String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
+ List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
+ evt.oldEntry().value());
- affinityCalculateSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
- @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
- for (WatchEvent evt : events) {
- byte[] assignmentVal = evt.newEntry().value();
-
- if (assignmentVal != null && assignmentVal.length == 0) {
- String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
-
- String placeholderValue = keyTail.substring(0, keyTail.indexOf('.'));
-
- UUID tblId = UUID.fromString(placeholderValue);
-
- try {
- String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
-
- int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
- .tables().get(name).partitions().value();
- int replicas = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
- .tables().get(name).replicas().value();
-
- var key = evt.newEntry().key();
- metaStorageMgr.invoke(
- Conditions.key(key).value().eq(assignmentVal),
- Operations.put(key, ByteUtils.toBytes(
- RendezvousAffinityFunction.assignPartitions(
- baselineMgr.nodes(),
- partitions,
- replicas,
- false,
- null
- ))),
- Operations.noop());
-
- LOG.info("Affinity manager calculated assignment for the table [name={}, tblId={}]",
- name, tblId);
- }
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to initialize affinity [key={}]",
- evt.newEntry().key().toString(), e);
- }
+ onEvent(AffinityEvent.REMOVED, new AffinityEventParameters(tblId, assignment), null);
+ }
+ else {
+ List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
+ evt.newEntry().value());
+
+ onEvent(AffinityEvent.CALCULATED, new AffinityEventParameters(tblId, assignment), null);
}
}
@@ -163,21 +118,50 @@ public class AffinityManager {
}
/**
- * Unsubscribes a listener form the affinity calculation.
+ * Calculates an assignment for a table which was specified by id.
+ *
+ * @param tblId Table identifier.
+ * @return A future which will complete when the assignment is calculated.
*/
- private void unsubscribeFromAssignmentCalculation() {
- if (affinityCalculateSubscriptionFut == null)
- return;
-
- try {
- Long subscriptionId = affinityCalculateSubscriptionFut.get();
+ public CompletableFuture<Boolean> calculateAssignments(UUID tblId) {
+ return vaultMgr
+ .get(ByteArray.fromString(INTERNAL_PREFIX + tblId))
+ .thenCompose(entry -> {
+ TableConfiguration tblConfig = configurationMgr
+ .configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY)
+ .tables()
+ .get(new String(entry.value(), StandardCharsets.UTF_8));
+
+ var key = new Key(INTERNAL_PREFIX + tblId);
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-14716 Need to support baseline changes.
+ return metaStorageMgr.invoke(
+ Conditions.key(key).value().eq(null),
+ Operations.put(key, ByteUtils.toBytes(
+ RendezvousAffinityFunction.assignPartitions(
+ baselineMgr.nodes(),
+ tblConfig.partitions().value(),
+ tblConfig.replicas().value(),
+ false,
+ null
+ ))),
+ Operations.noop());
+ });
+ }
- metaStorageMgr.unregisterWatch(subscriptionId);
+ /**
+ * Removes an assignment for a table which was specified by id.
+ *
+ * @param tblId Table identifier.
+ * @return A future which will complete when assignment is removed.
+ */
+ public CompletableFuture<Boolean> removeAssignment(UUID tblId) {
+ var key = new Key(INTERNAL_PREFIX + tblId);
- affinityCalculateSubscriptionFut = null;
- }
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Couldn't unsubscribe for Metastorage updates", e);
- }
+ return metaStorageMgr.invoke(
+ Conditions.key(key).value().ne(null),
+ Operations.remove(key),
+ Operations.noop());
}
}
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEvent.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEvent.java
new file mode 100644
index 0000000..25d2b6e
--- /dev/null
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity.event;
+
+import org.apache.ignite.internal.manager.Event;
+
+/**
+ * Affinity management events.
+ */
+public enum AffinityEvent implements Event {
+ /** This event fires when affinity assignment is calculated. */
+ CALCULATED,
+
+ /** This event fires when affinity assignment is removed. */
+ REMOVED
+}
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEventParameters.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEventParameters.java
new file mode 100644
index 0000000..0bedd1a
--- /dev/null
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/event/AffinityEventParameters.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.affinity.event;
+
+import java.util.List;
+import java.util.UUID;
+import org.apache.ignite.internal.manager.EventParameters;
+import org.apache.ignite.network.ClusterNode;
+
+/**
+ * Affinity event parameters. There are properties which associate with a concrete affinity assignment.
+ */
+public class AffinityEventParameters implements EventParameters {
+ /** Table identifier. */
+ private final UUID tableId;
+
+ /** Table schema view. */
+ private final List<List<ClusterNode>> assignment;
+
+ /**
+ * @param tableId Table identifier.
+ * @param assignment Affinity assignment.
+ */
+ public AffinityEventParameters(
+ UUID tableId,
+ List<List<ClusterNode>> assignment
+ ) {
+ this.tableId = tableId;
+ this.assignment = assignment;
+ }
+
+ /**
+ * Get the table identifier.
+ *
+ * @return Table id.
+ */
+ public UUID tableId() {
+ return tableId;
+ }
+
+ /**
+ * Gets an affinity assignment.
+ *
+ * @return Affinity assignment.
+ */
+ public List<List<ClusterNode>> assignment() {
+ return assignment;
+ }
+}
diff --git a/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java b/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
index be4d0fa..34e003d 100644
--- a/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
+++ b/modules/baseline/src/main/java/org/apache/ignite/internal/baseline/BaselineManager.java
@@ -27,6 +27,7 @@ import org.apache.ignite.network.ClusterService;
* Baseline manager is responsible for handling baseline related logic.
*/
// TODO: IGNITE-14586 Remove @SuppressWarnings when implementation provided.
+// TODO: https://issues.apache.org/jira/browse/IGNITE-14716 Adapt concept of baseline topology IEP-4.
@SuppressWarnings({"FieldCanBeLocal", "unused"}) public class BaselineManager {
/** Configuration manager in order to handle and listen baseline specific configuration.*/
private final ConfigurationManager configurationMgr;
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index 069560c..d6e15e8 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -130,7 +130,7 @@ public class MetaStorageManager {
Predicate<ClusterNode> metaStorageNodesContainsLocPred =
clusterNode -> Arrays.asList(metastorageNodes).contains(clusterNode.name());
- if (hasMetastorageLocally(locNodeName, metastorageNodes)) {
+ if (hasMetastorage(locNodeName, metastorageNodes)) {
this.metaStorageSvcFut = CompletableFuture.completedFuture(new MetaStorageServiceImpl(
raftMgr.startRaftGroup(
METASTORAGE_RAFT_GROUP_NAME,
@@ -509,24 +509,46 @@ public class MetaStorageManager {
}
/**
- * Checks whether the local node hosts meta storage.
+ * Checks whether the given node hosts meta storage.
*
- * @param locNodeName Local node uniq name.
+ * @param nodeName Node unique name.
* @param metastorageMembers Meta storage members names.
- * @return True if the node has meta storage, false otherwise.
+ * @return {@code true} if the node has meta storage, {@code false} otherwise.
*/
- public static boolean hasMetastorageLocally(String locNodeName, String[] metastorageMembers) {
- boolean isLocNodeHasMetasorage = false;
+ public static boolean hasMetastorage(String nodeName, String[] metastorageMembers) {
+ boolean isNodeHasMetasorage = false;
for (String name : metastorageMembers) {
- if (name.equals(locNodeName)) {
- isLocNodeHasMetasorage = true;
+ if (name.equals(nodeName)) {
+ isNodeHasMetasorage = true;
break;
}
}
- return isLocNodeHasMetasorage;
+ return isNodeHasMetasorage;
+ }
+
+ /**
+ * Checks whether the local node hosts meta storage.
+ *
+ * @param configurationMgr Configuration manager.
+ * @return {@code true} if the node has meta storage, {@code false} otherwise.
+ */
+ public static boolean hasMetastorageLocally(ConfigurationManager configurationMgr) {
+ String locNodeName = configurationMgr
+ .configurationRegistry()
+ .getConfiguration(NodeConfiguration.KEY)
+ .name()
+ .value();
+
+ String[] metastorageMembers = configurationMgr
+ .configurationRegistry()
+ .getConfiguration(NodeConfiguration.KEY)
+ .metastorageNodes()
+ .value();
+
+ return hasMetastorage(locNodeName, metastorageMembers);
}
// TODO: IGNITE-14691 Temporally solution that should be removed after implementing reactive watches.
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
index deba6d5..b7e757a 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java
@@ -156,7 +156,7 @@ public class IgnitionImpl implements Ignition {
BaselineManager baselineMgr = new BaselineManager(configurationMgr, metaStorageMgr, clusterNetSvc);
// Affinity manager startup.
- new AffinityManager(configurationMgr, metaStorageMgr, baselineMgr, vaultMgr);
+ AffinityManager affinityMgr = new AffinityManager(configurationMgr, metaStorageMgr, baselineMgr, vaultMgr);
SchemaManager schemaMgr = new SchemaManager(configurationMgr);
@@ -165,6 +165,7 @@ public class IgnitionImpl implements Ignition {
configurationMgr,
metaStorageMgr,
schemaMgr,
+ affinityMgr,
raftMgr,
vaultMgr
);
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index bf93ea1..9051426 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -55,6 +55,11 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-affinity</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-baseline</artifactId>
</dependency>
@@ -80,12 +85,6 @@
<!-- Test dependencies -->
<dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-affinity</artifactId>
- <scope>test</scope>
- </dependency>
-
- <dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4d47435..a5c1cc0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -21,21 +21,21 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.apache.ignite.configuration.internal.ConfigurationManager;
-import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
-import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.affinity.AffinityManager;
+import org.apache.ignite.internal.affinity.event.AffinityEvent;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
@@ -46,15 +46,11 @@ import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListene
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
-import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.metastorage.common.Conditions;
import org.apache.ignite.metastorage.common.Key;
import org.apache.ignite.metastorage.common.Operations;
-import org.apache.ignite.metastorage.common.WatchEvent;
-import org.apache.ignite.metastorage.common.WatchListener;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.table.Table;
@@ -77,116 +73,96 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Configuration manager. */
private final ConfigurationManager configurationMgr;
- /** Table creation subscription future. */
- private CompletableFuture<Long> tableCreationSubscriptionFut;
+ /** Raft manmager. */
+ private final Loza raftMgr;
+
+ /** Schema manager. */
+ private final SchemaManager schemaMgr;
+
+ /** Affinity manager. */
+ private final AffinityManager affMgr;
/** Tables. */
private Map<String, TableImpl> tables = new ConcurrentHashMap<>();
- /*
+ /**
+ * Creates a new table manager.
+ *
* @param configurationMgr Configuration manager.
* @param metaStorageMgr Meta storage manager.
- * @param schemaManager Schema manager.
+ * @param schemaMgr Schema manager.
+ * @param affMgr Affinity manager.
* @param raftMgr Raft manager.
* @param vaultManager Vault manager.
*/
public TableManager(
ConfigurationManager configurationMgr,
MetaStorageManager metaStorageMgr,
- SchemaManager schemaManager,
+ SchemaManager schemaMgr,
+ AffinityManager affMgr,
Loza raftMgr,
VaultManager vaultManager
) {
this.configurationMgr = configurationMgr;
this.metaStorageMgr = metaStorageMgr;
+ this.affMgr = affMgr;
+ this.raftMgr = raftMgr;
+ this.schemaMgr = schemaMgr;
- String localNodeName = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
- .name().value();
-
- configurationMgr.configurationRegistry().getConfiguration(ClusterConfiguration.KEY)
- .metastorageNodes().listen(ctx -> {
- if (ctx.newValue() != null) {
- if (MetaStorageManager.hasMetastorageLocally(localNodeName, ctx.newValue()))
- subscribeForTableCreation();
- else
- unsubscribeForTableCreation();
- }
- return CompletableFuture.completedFuture(null);
-
- });
-
- String[] metastorageMembers = configurationMgr.configurationRegistry().getConfiguration(NodeConfiguration.KEY)
- .metastorageNodes().value();
-
- if (MetaStorageManager.hasMetastorageLocally(localNodeName, metastorageMembers))
- subscribeForTableCreation();
-
- String tableInternalPrefix = INTERNAL_PREFIX + "assignment.";
-
- tableCreationSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
- @Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
- for (WatchEvent evt : events) {
- String placeholderValue = evt.newEntry().key().toString().substring(tableInternalPrefix.length() - 1);
-
- String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + placeholderValue))
- .join().value(), StandardCharsets.UTF_8);
-
- UUID tblId = UUID.fromString(placeholderValue);
-
- if (evt.newEntry().value() == null) {
- assert evt.oldEntry().value() != null : "Previous assignment is unknown";
-
- List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
- evt.oldEntry().value());
-
- int partitions = assignment.size();
+ listenForTableChange();
+ }
- for (int p = 0; p < partitions; p++)
- raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
+ /**
+ * Creates local structures for a table.
+ *
+ * @param name Table name.
+ * @param tblId Table id.
+ * @param assignment Affinity assignment.
+ */
+ private void createTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
+ int partitions = assignment.size();
- TableImpl table = tables.get(name);
+ HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
- assert table != null : "There is no table with the name specified [name=" + name + ']';
+ for (int p = 0; p < partitions; p++) {
+ partitionMap.put(p, raftMgr.startRaftGroup(
+ raftGroupName(tblId, p),
+ assignment.get(p),
+ new PartitionCommandListener()
+ ));
+ }
- onEvent(TableEvent.DROP, new TableEventParameters(
- tblId,
- name,
- table.schemaView(),
- table.internalTable()
- ), null);
- }
- else if (evt.newEntry().value().length > 0) {
- List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
- evt.newEntry().value());
+ onEvent(TableEvent.CREATE, new TableEventParameters(
+ tblId,
+ name,
+ new TableSchemaViewImpl(tblId, schemaMgr),
+ new InternalTableImpl(tblId, partitionMap, partitions)
+ ), null);
+ }
- int partitions = assignment.size();
+ /**
+ * Drops local structures for a table.
+ *
+ * @param name Table name.
+ * @param tblId Table id.
+ * @param assignment Affinity assignment.
+ */
+ private void dropTableLocally(String name, UUID tblId, List<List<ClusterNode>> assignment) {
+ int partitions = assignment.size();
- HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
+ for (int p = 0; p < partitions; p++)
+ raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
- for (int p = 0; p < partitions; p++) {
- partitionMap.put(p, raftMgr.startRaftGroup(
- raftGroupName(tblId, p),
- assignment.get(p),
- new PartitionCommandListener()
- ));
- }
+ TableImpl table = tables.get(name);
- onEvent(TableEvent.CREATE, new TableEventParameters(
- tblId,
- name,
- new TableSchemaViewImpl(tblId, schemaManager),
- new InternalTableImpl(tblId, partitionMap, partitions)
- ), null);
- }
- }
+ assert table != null : "There is no table with the name specified [name=" + name + ']';
- return true;
- }
-
- @Override public void onError(@NotNull Throwable e) {
- LOG.error("Metastorage listener issue", e);
- }
- });
+ onEvent(TableEvent.DROP, new TableEventParameters(
+ tblId,
+ name,
+ table.schemaView(),
+ table.internalTable()
+ ), null);
}
/**
@@ -201,14 +177,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
/**
- * Subscribes on table create.
+ * Listens on a drop or create table.
*/
- private void subscribeForTableCreation() {
+ private void listenForTableChange() {
//TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
- configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
- .tables().listen(ctx -> {
- Set<String> tablesToStart = ctx.newValue().namedListKeys() == null ?
- Collections.EMPTY_SET : ctx.newValue().namedListKeys();
+ configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
+ Set<String> tablesToStart = (ctx.newValue() == null || ctx.newValue().namedListKeys() == null) ?
+ Collections.emptySet() : new HashSet<>(ctx.newValue().namedListKeys());
tablesToStart.removeAll(ctx.oldValue().namedListKeys());
@@ -216,24 +191,46 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
List<CompletableFuture<Boolean>> futs = new ArrayList<>();
+ boolean hasMetastorageLocally = MetaStorageManager.hasMetastorageLocally(configurationMgr);
+
for (String tblName : tablesToStart) {
TableView tableView = ctx.newValue().get(tblName);
- long update = 0;
-
- UUID tblId = new UUID(revision, update);
-
- var key = new Key(INTERNAL_PREFIX + tblId.toString());
- futs.add(metaStorageMgr.invoke(
- Conditions.key(key).value().eq(null),
- Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
- Operations.noop()).thenCompose(res ->
- res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0])
- .thenApply(v -> true)
- : CompletableFuture.completedFuture(false)));
+
+ UUID tblId = new UUID(revision, 0L);
+
+ if (hasMetastorageLocally) {
+ var key = new Key(INTERNAL_PREFIX + tblId.toString());
+
+ futs.add(metaStorageMgr.invoke(
+ Conditions.key(key).value().eq(null),
+ Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)),
+ Operations.noop()).thenCompose(res ->
+ affMgr.calculateAssignments(tblId)));
+ }
+
+ affMgr.listen(AffinityEvent.CALCULATED, (parameters, e) -> {
+ if (!tblId.equals(parameters.tableId()))
+ return false;
+
+ if (e == null)
+ createTableLocally(tblName, tblId, parameters.assignment());
+ else {
+ LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e);
+
+ onEvent(TableEvent.CREATE, new TableEventParameters(
+ tblId,
+ tblName,
+ null,
+ null
+ ), e);
+ }
+
+ return true;
+ });
}
- Set<String> tablesToStop = ctx.oldValue().namedListKeys() == null ?
- Collections.EMPTY_SET : ctx.oldValue().namedListKeys();
+ Set<String> tablesToStop = (ctx.oldValue() == null || ctx.oldValue().namedListKeys() == null) ?
+ Collections.emptySet() : new HashSet<>(ctx.oldValue().namedListKeys());
tablesToStop.removeAll(ctx.newValue().namedListKeys());
@@ -242,37 +239,38 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
UUID tblId = t.internalTable().tableId();
- var key = new Key(INTERNAL_PREFIX + "assignment." + tblId.toString());
- futs.add(metaStorageMgr.invoke(
- Conditions.key(key).value().ne(null),
- Operations.remove(key),
- Operations.noop()).thenCompose(res ->
- res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId.toString()))
- .thenApply(v -> true)
- : CompletableFuture.completedFuture(false)));
- }
+ if (hasMetastorageLocally) {
+ var key = new Key(INTERNAL_PREFIX + tblId.toString());
- return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
- });
- }
+ futs.add(affMgr.removeAssignment(tblId).thenCompose(res ->
+ metaStorageMgr.invoke(Conditions.key(key).value().ne(null),
+ Operations.remove(key),
+ Operations.noop())));
+ }
- /**
- * Unsubscribe from table creation.
- */
- private void unsubscribeForTableCreation() {
- if (tableCreationSubscriptionFut == null)
- return;
+ affMgr.listen(AffinityEvent.REMOVED, (parameters, e) -> {
+ if (!tblId.equals(parameters.tableId()))
+ return false;
- try {
- Long subscriptionId = tableCreationSubscriptionFut.get();
+ if (e == null)
+ dropTableLocally(tblName, tblId, parameters.assignment());
+ else {
+ LOG.error("Failed to drop a table [name=" + tblName + ", id=" + tblId + ']', e);
- metaStorageMgr.unregisterWatch(subscriptionId);
+ onEvent(TableEvent.DROP, new TableEventParameters(
+ tblId,
+ tblName,
+ null,
+ null
+ ), e);
+ }
- tableCreationSubscriptionFut = null;
- }
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Couldn't unsubscribe from table creation", e);
- }
+ return true;
+ });
+ }
+
+ return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
+ });
}
/** {@inheritDoc} */
@@ -327,10 +325,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
}
});
- configurationMgr.configurationRegistry()
- .getConfiguration(TablesConfiguration.KEY).tables().change(change -> {
- change.delete(name);
- });
+ configurationMgr
+ .configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY)
+ .tables()
+ .change(change -> change.delete(name));
dropTblFut.join();
}