You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/05/04 09:47:12 UTC
[ignite-3] branch main updated: IGNITE-14238 Creating and
destroying tables. Fixes #112
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 26051c4 IGNITE-14238 Creating and destroying tables. Fixes #112
26051c4 is described below
commit 26051c4bb693b17e959214b403de14e474b5d306
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Tue May 4 12:46:38 2021 +0300
IGNITE-14238 Creating and destroying tables. Fixes #112
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../ignite/internal/affinity/AffinityManager.java | 7 +-
.../apache/ignite/table/manager/IgniteTables.java | 7 +
.../org/apache/ignite/internal/manager/Event.java | 25 +++
.../ignite/internal/manager/EventParameters.java | 26 +++
.../apache/ignite/internal/manager/Producer.java | 69 ++++++
.../apache/ignite/internal/util/ArrayUtils.java | 5 -
.../java/org/apache/ignite/internal/raft/Loza.java | 18 +-
.../ignite/internal/table/InternalTable.java | 8 +
.../apache/ignite/internal/table/TableImpl.java | 18 ++
.../ignite/internal/table/TableSchemaViewImpl.java | 52 +++++
.../internal/table/distributed/TableManager.java | 239 +++++++++++++--------
.../distributed/storage/InternalTableImpl.java | 13 ++
.../ignite/internal/table/event/TableEvent.java | 31 +++
.../internal/table/event/TableEventParameters.java | 95 ++++++++
.../ignite/table/impl/DummyInternalTableImpl.java | 6 +
15 files changed, 522 insertions(+), 97 deletions(-)
diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
index 937af33..82df3f8 100644
--- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
+++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java
@@ -27,7 +27,6 @@ import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
@@ -134,7 +133,9 @@ public class AffinityManager {
affinityCalculateSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
@Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
for (WatchEvent evt : events) {
- if (ArrayUtils.empty(evt.newEntry().value())) {
+ byte[] assignmentVal = evt.newEntry().value();
+
+ if (assignmentVal != null && assignmentVal.length == 0) {
String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
String placeholderValue = keyTail.substring(0, keyTail.indexOf('.'));
@@ -150,7 +151,7 @@ public class AffinityManager {
.tables().get(name).replicas().value();
metaStorageMgr.invoke(evt.newEntry().key(),
- Conditions.value().eq(evt.newEntry().value()),
+ Conditions.value().eq(assignmentVal),
Operations.put(ByteUtils.toBytes(
RendezvousAffinityFunction.assignPartitions(
baselineMgr.nodes(),
diff --git a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
index 24b20f6..780e9f1 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
@@ -40,6 +40,13 @@ public interface IgniteTables {
Table createTable(String name, Consumer<TableChange> tableInitChange);
/**
+ * Drops a table with the name specified.
+ *
+ * @param name Table name.
+ */
+ void dropTable(String name);
+
+ /**
* Gets a list of all started tables.
*
* @return List of tables.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
new file mode 100644
index 0000000..c245b5b
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.manager;
+
+/**
+ * The event cas whcih is produced by event producer component.
+ * @see Producer#onEvent(Event, EventParameters, Exception)
+ */
+public interface Event {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
new file mode 100644
index 0000000..140047d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.manager;
+
+/**
+ * Event parameters.
+ * This type passed to the event listener.
+ * @see Producer#onEvent(Event, EventParameters, Exception)
+ */
+public interface EventParameters {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
new file mode 100644
index 0000000..c198bba
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.manager;
+
+import java.util.Iterator;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.function.BiPredicate;
+
+/**
+ * Interface which can produce its events.
+ */
+public abstract class Producer<T extends Event, P extends EventParameters> {
+ /** All listeners. */
+ private ConcurrentHashMap<T, ConcurrentLinkedQueue<BiPredicate<P, Exception>>> listeners = new ConcurrentHashMap<>();
+
+ /**
+ * Registers an event listener.
+ * When the event predicate returns true it would never invoke after,
+ * otherwise this predicate would receive an event again.
+ *
+ * @param evt Event.
+ * @param closure Closure.
+ */
+ public void listen(T evt, BiPredicate<P, Exception> closure) {
+ listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>())
+ .offer(closure);
+ }
+
+ /**
+ * Notifies every listener that subscribed before.
+ *
+ * @param evt Event type.
+ * @param params Event parameters.
+ * @param err Exception when it was happened, or {@code null} otherwise.
+ */
+ protected void onEvent(T evt, P params, Exception err) {
+ ConcurrentLinkedQueue<BiPredicate<P, Exception>> queue = listeners.get(evt);
+
+ if (queue == null)
+ return;
+
+ BiPredicate<P, Exception> closure;
+
+ Iterator<BiPredicate<P, Exception>> iter = queue.iterator();
+
+ while (iter.hasNext()) {
+ closure = iter.next();
+
+ if (closure.test(params, err))
+ iter.remove();
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
index dcbb3d4..0457241 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ArrayUtils.java
@@ -179,11 +179,6 @@ public final class ArrayUtils {
}
};
- /** */
- public static boolean empty(byte[] arr) {
- return arr == null || arr.length == 0;
- }
-
/**
* Stub.
*/
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 3d2ce2c..3444091 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -70,9 +70,10 @@ public class Loza {
* @return A RAFT group client.
*/
public RaftGroupService startRaftGroup(String groupId, List<ClusterNode> peers, RaftGroupCommandListener lsnr) {
- assert peers.size() > 1;
+ assert !peers.isEmpty();
//Now we are using only one node in a raft group.
+ //TODO: IGNITE-13885 Investigate jraft implementation for replication framework based on RAFT protocol.
if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name()))
raftServer.setListener(groupId, lsnr);
@@ -86,4 +87,19 @@ public class Loza {
DELAY
);
}
+
+ /**
+ * Stops a RAFT group.
+ *
+ * @param groupId RAFT group id.
+ * @param peers Group peers.
+ */
+ public void stopRaftGroup(String groupId, List<ClusterNode> peers) {
+ assert !peers.isEmpty();
+
+ //Now we are using only one node in a raft group.
+ //TODO: IGNITE-13885 Investigate jraft implementation for replication framework based on RAFT protocol.
+ if (peers.get(0).name().equals(clusterNetSvc.topologyService().localMember().name()))
+ raftServer.clearListener(groupId);
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 1bddfed..4134e33 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.table;
import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.schema.BinaryRow;
import org.jetbrains.annotations.NotNull;
@@ -28,6 +29,13 @@ import org.jetbrains.annotations.NotNull;
*/
public interface InternalTable {
/**
+ * Gets a table id.
+ *
+ * @return Table id as UUID.
+ */
+ @NotNull UUID tableId();
+
+ /**
* Asynchronously gets a row with same key columns values as given one from the table.
*
* @param keyRow Row with key columns set.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index cc70462..3fd11de 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -57,6 +57,24 @@ public class TableImpl extends AbstractTableView implements Table {
marsh = new TupleMarshallerImpl(schemaMgr);
}
+ /**
+ * Gets an internal table associated with the table.
+ *
+ * @return Internal table.
+ */
+ public @NotNull InternalTable internalTable() {
+ return tbl;
+ }
+
+ /**
+ * Gets a schema view for the table.
+ *
+ * @return Schema view.
+ */
+ public TableSchemaView schemaView() {
+ return schemaMgr;
+ }
+
/** {@inheritDoc} */
@Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
return new RecordViewImpl<>(tbl, schemaMgr, recMapper);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
new file mode 100644
index 0000000..dd74868
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaViewImpl.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.UUID;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+
+/**
+ * Schema view implementation.
+ */
+public class TableSchemaViewImpl implements TableSchemaView {
+ /** Table identifier. */
+ private final UUID tableId;
+
+ /** Schema manager. */
+ private final SchemaManager schemaManager;
+
+ /**
+ * @param tableId Table identifier.
+ * @param schemaManager Schema manager.
+ */
+ public TableSchemaViewImpl(UUID tableId, SchemaManager schemaManager) {
+ this.tableId = tableId;
+ this.schemaManager = schemaManager;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor schema() {
+ return schemaManager.schema(tableId);
+ }
+
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor schema(int ver) {
+ return schemaManager.schema(tableId, ver);
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 86a04e4..1d65043 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -19,13 +19,16 @@ package org.apache.ignite.internal.table.distributed;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
@@ -33,15 +36,16 @@ import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.raft.Loza;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.internal.table.TableSchemaView;
+import org.apache.ignite.internal.table.TableSchemaViewImpl;
import org.apache.ignite.internal.table.distributed.raft.PartitionCommandListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
-import org.apache.ignite.internal.util.ArrayUtils;
+import org.apache.ignite.internal.table.event.TableEvent;
+import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
@@ -60,7 +64,7 @@ import org.jetbrains.annotations.NotNull;
/**
* Table manager.
*/
-public class TableManager implements IgniteTables {
+public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class);
@@ -77,13 +81,14 @@ public class TableManager implements IgniteTables {
private CompletableFuture<Long> tableCreationSubscriptionFut;
/** Tables. */
- private Map<String, Table> tables;
+ private Map<String, TableImpl> tables = new ConcurrentHashMap<>();
- /**
+ /*
* @param configurationMgr Configuration manager.
* @param metaStorageMgr Meta storage manager.
* @param schemaManager Schema manager.
* @param raftMgr Raft manager.
+ * @param vaultManager Vault manager.
*/
public TableManager(
ConfigurationManager configurationMgr,
@@ -92,8 +97,6 @@ public class TableManager implements IgniteTables {
Loza raftMgr,
VaultManager vaultManager
) {
- tables = new HashMap<>();
-
this.configurationMgr = configurationMgr;
this.metaStorageMgr = metaStorageMgr;
@@ -123,52 +126,57 @@ public class TableManager implements IgniteTables {
tableCreationSubscriptionFut = metaStorageMgr.registerWatchByPrefix(new Key(tableInternalPrefix), new WatchListener() {
@Override public boolean onUpdate(@NotNull Iterable<WatchEvent> events) {
for (WatchEvent evt : events) {
- if (!ArrayUtils.empty(evt.newEntry().value())) {
- String keyTail = evt.newEntry().key().toString().substring(INTERNAL_PREFIX.length());
-
- String placeholderValue = keyTail.substring(0, keyTail.indexOf('.'));
-
- UUID tblId = UUID.fromString(placeholderValue);
-
- try {
- String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8);
-
- int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
- .tables().get(name).partitions().value();
-
- List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
- evt.newEntry().value());
-
- HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
-
- for (int p = 0; p < partitions; p++) {
- partitionMap.put(p, raftMgr.startRaftGroup(
- name + "_part_" + p,
- assignment.get(p),
- new PartitionCommandListener()
- ));
- }
-
- tables.put(name, new TableImpl(
- new InternalTableImpl(
- tblId,
- partitionMap,
- partitions
- ),
- new TableSchemaView() {
- @Override public SchemaDescriptor schema() {
- return schemaManager.schema(tblId);
- }
-
- @Override public SchemaDescriptor schema(int ver) {
- return schemaManager.schema(tblId, ver);
- }
- }));
- }
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Failed to start table [key={}]",
- evt.newEntry().key(), e);
+ String placeholderValue = evt.newEntry().key().toString().substring(tableInternalPrefix.length() - 1);
+
+ String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + placeholderValue))
+ .join().value(), StandardCharsets.UTF_8);
+
+ UUID tblId = UUID.fromString(placeholderValue);
+
+ if (evt.newEntry().value() == null) {
+ assert evt.oldEntry().value() != null : "Previous assignment is unknown";
+
+ List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
+ evt.oldEntry().value());
+
+ int partitions = assignment.size();
+
+ for (int p = 0; p < partitions; p++)
+ raftMgr.stopRaftGroup(raftGroupName(tblId, p), assignment.get(p));
+
+ TableImpl table = tables.get(name);
+
+ assert table != null : "There is no table with the name specified [name=" + name + ']';
+
+ onEvent(TableEvent.DROP, new TableEventParameters(
+ tblId,
+ name,
+ table.schemaView(),
+ table.internalTable()
+ ), null);
+ }
+ else if (evt.newEntry().value().length > 0) {
+ List<List<ClusterNode>> assignment = (List<List<ClusterNode>>)ByteUtils.fromBytes(
+ evt.newEntry().value());
+
+ int partitions = assignment.size();
+
+ HashMap<Integer, RaftGroupService> partitionMap = new HashMap<>(partitions);
+
+ for (int p = 0; p < partitions; p++) {
+ partitionMap.put(p, raftMgr.startRaftGroup(
+ raftGroupName(tblId, p),
+ assignment.get(p),
+ new PartitionCommandListener()
+ ));
}
+
+ onEvent(TableEvent.CREATE, new TableEventParameters(
+ tblId,
+ name,
+ new TableSchemaViewImpl(tblId, schemaManager),
+ new InternalTableImpl(tblId, partitionMap, partitions)
+ ), null);
}
}
@@ -182,6 +190,17 @@ public class TableManager implements IgniteTables {
}
/**
+ * Compounds a RAFT group unique name.
+ *
+ * @param tableId Table identifier.
+ * @param partition Muber of table partition.
+ * @return A RAFT group name.
+ */
+ @NotNull private String raftGroupName(UUID tableId, int partition) {
+ return tableId + "_part_" + partition;
+ }
+
+ /**
* Checks whether the local node hosts Metastorage.
*
* @param localNodeName Local node uniq name.
@@ -208,39 +227,52 @@ public class TableManager implements IgniteTables {
//TODO: IGNITE-14652 Change a metastorage update in listeners to multi-invoke
configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
.tables().listen(ctx -> {
- HashSet<String> tblNamesToStart = new HashSet<>(ctx.newValue().namedListKeys());
+ Set<String> tablesToStart = ctx.newValue().namedListKeys() == null ?
+ Collections.EMPTY_SET : ctx.newValue().namedListKeys();
+
+ tablesToStart.removeAll(ctx.oldValue().namedListKeys());
long revision = ctx.storageRevision();
- if (ctx.oldValue() != null)
- tblNamesToStart.removeAll(ctx.oldValue().namedListKeys());
+ List<CompletableFuture<Boolean>> futs = new ArrayList<>();
- for (String tblName : tblNamesToStart) {
+ for (String tblName : tablesToStart) {
TableView tableView = ctx.newValue().get(tblName);
long update = 0;
UUID tblId = new UUID(revision, update);
- CompletableFuture<Boolean> fut = metaStorageMgr.invoke(
+ futs.add(metaStorageMgr.invoke(
new Key(INTERNAL_PREFIX + tblId.toString()),
Conditions.value().eq(null),
Operations.put(tableView.name().getBytes(StandardCharsets.UTF_8)),
- Operations.noop());
+ Operations.noop()).thenCompose(res ->
+ res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0])
+ .thenApply(v -> true)
+ : CompletableFuture.completedFuture(false)));
+ }
- try {
- if (fut.get()) {
- metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0]);
+ Set<String> tablesToStop = ctx.oldValue().namedListKeys() == null ?
+ Collections.EMPTY_SET : ctx.oldValue().namedListKeys();
- LOG.info("Table manager created a table [name={}, revision={}]",
- tableView.name(), revision);
- }
- }
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Table was not fully initialized [name={}, revision={}]",
- tableView.name(), revision, e);
- }
+ tablesToStop.removeAll(ctx.newValue().namedListKeys());
+
+ for (String tblName : tablesToStop) {
+ TableImpl t = tables.get(tblName);
+
+ UUID tblId = t.internalTable().tableId();
+
+ futs.add(metaStorageMgr.invoke(
+ new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()),
+ Conditions.value().ne(null),
+ Operations.remove(),
+ Operations.noop()).thenCompose(res ->
+ res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId.toString()))
+ .thenApply(v -> true)
+ : CompletableFuture.completedFuture(false)));
}
- return CompletableFuture.completedFuture(null);
+
+ return CompletableFuture.allOf(futs.toArray(CompletableFuture[]::new));
});
}
@@ -265,31 +297,62 @@ public class TableManager implements IgniteTables {
/** {@inheritDoc} */
@Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
+ CompletableFuture<Table> tblFut = new CompletableFuture<>();
+
+ listen(TableEvent.CREATE, (params, e) -> {
+ String tableName = params.tableName();
+
+ if (!name.equals(tableName))
+ return false;
+
+ if (e == null) {
+ tblFut.complete(tables.compute(tableName, (key, val) ->
+ new TableImpl(params.internalTable(), params.tableSchemaView())));
+ }
+ else
+ tblFut.completeExceptionally(e);
+
+ return true;
+ });
+
configurationMgr.configurationRegistry()
.getConfiguration(TablesConfiguration.KEY).tables().change(change ->
change.create(name, tableInitChange));
-// this.createTable("tbl1", change -> {
-// change.initReplicas(2);
-// change.initName("tbl1");
-// change.initPartitions(1_000);
-// });
+ return tblFut.join();
+ }
- //TODO: IGNITE-14646 Support asynchronous table creation
- Table tbl = null;
+ /** {@inheritDoc} */
+ @Override public void dropTable(String name) {
+ CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
- while (tbl == null) {
- try {
- Thread.sleep(50);
+ listen(TableEvent.DROP, new BiPredicate<TableEventParameters, Exception>() {
+ @Override public boolean test(TableEventParameters params, Exception e) {
+ String tableName = params.tableName();
- tbl = table(name);
- }
- catch (InterruptedException e) {
- LOG.error("Waiting of creation of table was interrupted.", e);
+ if (!name.equals(tableName))
+ return false;
+
+ if (e == null) {
+ Table droppedTable = tables.remove(tableName);
+
+ assert droppedTable != null;
+
+ dropTblFut.complete(null);
+ }
+ else
+ dropTblFut.completeExceptionally(e);
+
+ return true;
}
- }
+ });
+
+ configurationMgr.configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY).tables().change(change -> {
+ change.delete(name);
+ });
- return tbl;
+ dropTblFut.join();
}
/** {@inheritDoc} */
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 2ff5e8b..c7dcfa3 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -42,6 +42,9 @@ public class InternalTableImpl implements InternalTable {
/** Partitions. */
private int partitions;
+ /** Table identifier. */
+ private UUID tableId;
+
/**
* @param tableId Table id.
* @param partMap Map partition id to raft group.
@@ -52,10 +55,20 @@ public class InternalTableImpl implements InternalTable {
Map<Integer, RaftGroupService> partMap,
int partitions
) {
+ this.tableId = tableId;
this.partitionMap = partMap;
this.partitions = partitions;
}
+ /**
+ * Gets a table id.
+ *
+ * @return Table id as UUID.
+ */
+ @Override public @NotNull UUID tableId() {
+ return tableId;
+ }
+
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
return partitionMap.get(keyRow.hash() % partitions).<KVGetResponse>run(new GetCommand(keyRow))
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
new file mode 100644
index 0000000..f2a737e
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.event;
+
+import org.apache.ignite.internal.manager.Event;
+
+/**
+ * Table management events.
+ */
+public enum TableEvent implements Event {
+ /** This event is fired when a table was created. */
+ CREATE,
+
+ /** This event is fired when a table was dropped. */
+ DROP
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
new file mode 100644
index 0000000..853617d
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.event;
+
+import java.util.UUID;
+import org.apache.ignite.internal.manager.EventParameters;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableSchemaView;
+
+/**
+ * Table event parameters.
+ * There are properties which associate with a concrete table.
+ */
+public class TableEventParameters implements EventParameters {
+ /** Table identifier. */
+ private final UUID tableId;
+
+ /** Table name. */
+ private final String tableName;
+
+ /** Table schema view. */
+ private final TableSchemaView tableSchemaView;
+
+ /** Internal table. */
+ private final InternalTable internalTable;
+
+ /**
+ * @param tableId Table identifier.
+ * @param tableName Table name.
+ * @param tableSchemaView Table schema view.
+ * @param internalTable Internal table.
+ */
+ public TableEventParameters(
+ UUID tableId,
+ String tableName,
+ TableSchemaView tableSchemaView,
+ InternalTable internalTable
+ ) {
+ this.tableId = tableId;
+ this.tableName = tableName;
+ this.tableSchemaView = tableSchemaView;
+ this.internalTable = internalTable;
+ }
+
+ /**
+ * Get the table identifier.
+ *
+ * @return Table id.
+ */
+ public UUID tableId() {
+ return tableId;
+ }
+
+ /**
+ * Gets the table name.
+ *
+ * @return Table name.
+ */
+ public String tableName() {
+ return tableName;
+ }
+
+ /**
+ * Gets a schema view for the table.
+ *
+ * @return Schema descriptor.
+ */
+ public TableSchemaView tableSchemaView() {
+ return tableSchemaView;
+ }
+
+ /**
+ * Gets an internal table associated with the table.
+ *
+ * @return Internal table.
+ */
+ public InternalTable internalTable() {
+ return internalTable;
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
index 5e9a352..23d46c4 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -77,6 +78,11 @@ public class DummyInternalTableImpl implements InternalTable {
}
/** {@inheritDoc} */
+ @Override public @NotNull UUID tableId() {
+ return UUID.randomUUID();
+ }
+
+ /** {@inheritDoc} */
@Override public CompletableFuture<BinaryRow> get(@NotNull BinaryRow row) {
assert row != null;