You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/07/12 12:13:43 UTC
[ignite-3] branch main updated: IGNITE-15055 Fixed an issue with
creation a table that already exists. Fixes #200
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d7b295e IGNITE-15055 Fixed an issue with creation a table that already exists. Fixes #200
d7b295e is described below
commit d7b295e60cbbf1f86aa3f2b1e871573204c321df
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Mon Jul 12 15:13:05 2021 +0300
IGNITE-15055 Fixed an issue with creation a table that already exists. Fixes #200
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../apache/ignite/table/manager/IgniteTables.java | 15 ++-
.../ignite/internal/manager/EventListener.java | 1 -
.../internal/manager/ListenerRemovedException.java | 12 +++
.../apache/ignite/internal/manager/Producer.java | 15 ++-
.../java/org/apache/ignite/internal/raft/Loza.java | 4 +-
.../internal/table/distributed/TableManager.java | 104 +++++++++++++++------
.../ignite/internal/table/TableManagerTest.java | 51 +++++++---
7 files changed, 154 insertions(+), 48 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
index aab5f3e..cf1342e 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
@@ -30,11 +30,12 @@ import org.apache.ignite.table.Table;
*/
public interface IgniteTables {
/**
- * Creates a cluster table.
+ * Creates a new table with the given {@code name}.
+ * If a table with the same name already exists, an exception will be thrown.
*
* @param name Table name.
* @param tableInitChange Table changer.
- * @return Table.
+ * @return Newly created table.
*/
Table createTable(String name, Consumer<TableChange> tableInitChange);
@@ -47,7 +48,17 @@ public interface IgniteTables {
void alterTable(String name, Consumer<TableChange> tableChange);
/**
+ * Creates a new table with the given {@code name} or returns an existing one with the same {@code name}.
+ *
+ * @param name Table name.
+ * @param tableInitChange Table changer.
+ * @return Existing or newly created table.
+ */
+ Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange);
+
+ /**
* Drops a table with the name specified.
+ * If a table with the specified name does not exist in the cluster, the operation has no effect.
*
* @param name Table name.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
index 04e684c..4a07c73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
@@ -25,7 +25,6 @@ import org.jetbrains.annotations.Nullable;
* @see Producer#listen(Event, EventListener)
*/
public interface EventListener<P extends EventParameters> {
-
/**
* Notifies the listener about an event.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java
index bff967a..f486c85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java
@@ -23,4 +23,16 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
* The exception notifies a listener when the listener was removed from queue and never receive a notification again.
*/
public class ListenerRemovedException extends IgniteInternalCheckedException {
+ /**
+ * Default constructor.
+ */
+ public ListenerRemovedException() {
+ }
+
+ /**
+ * @param cause The exception that was a cause which a listener is removed.
+ */
+ public ListenerRemovedException(IgniteInternalCheckedException cause) {
+ super(cause);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
index 296e5a5..fabebfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.manager;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
/**
* Interface which can produce its events.
@@ -48,8 +50,19 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
* @param closure Closure.
*/
public void removeListener(T evt, EventListener<P> closure) {
+ removeListener(evt, closure, null);
+ }
+
+ /**
+ * Removes a listener associated with the event.
+ *
+ * @param evt Event.
+ * @param closure Closure.
+ * @param cause The exception that was a cause which a listener is removed.
+ */
+ public void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause) {
if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure))
- closure.remove(new ListenerRemovedException());
+ closure.remove(cause == null ? new ListenerRemovedException() : new ListenerRemovedException(cause));
}
/**
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index bfa8066..625153a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.raft;
import java.util.List;
import java.util.stream.Collectors;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.raft.client.Peer;
@@ -26,8 +28,6 @@ import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
import org.apache.ignite.raft.client.service.RaftGroupListener;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.internal.raft.server.RaftServer;
-import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
/**
* Best raft manager ever since 1982.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index d250fea..d27adcc 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.configuration.ConfigurationManager;
import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
import org.apache.ignite.internal.manager.EventListener;
-import org.apache.ignite.internal.manager.ListenerRemovedException;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.client.Conditions;
@@ -65,7 +64,9 @@ import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.LoggerMessageHelper;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.schema.PrimaryIndex;
@@ -476,9 +477,27 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** {@inheritDoc} */
@Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
+ return createTable(name, tableInitChange, true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange) {
+ return createTable(name, tableInitChange, false);
+ }
+
+ /**
+ * Creates a new table with the specified name or returns an existing table with the same name.
+ *
+ * @param name Table name.
+ * @param tableInitChange Table configuration.
+ * @param exceptionWhenExist If the value is {@code true}, an exception will be thrown when the table already exists,
+ * {@code false} means the existing table will be returned.
+ * @return A table instance.
+ */
+ public Table createTable(String name, Consumer<TableChange> tableInitChange, boolean exceptionWhenExist) {
CompletableFuture<Table> tblFut = new CompletableFuture<>();
- listen(TableEvent.CREATE, new EventListener<>() {
+ EventListener<TableEventParameters> clo = new EventListener<>() {
@Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
String tableName = parameters.tableName();
@@ -496,17 +515,34 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
@Override public void remove(@NotNull Throwable e) {
tblFut.completeExceptionally(e);
}
- });
+ };
- try {
- configurationMgr.configurationRegistry()
- .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
- change.create(name, tableInitChange)).get();
+ listen(TableEvent.CREATE, clo);
+
+ Table tbl = table(name, true);
+
+ if (tbl != null) {
+ if (exceptionWhenExist) {
+ removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(
+ LoggerMessageHelper.format("Table already exists [name={}]", name)));
+ }
+ else if (tblFut.complete(tbl))
+ removeListener(TableEvent.CREATE, clo);
}
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Table wasn't created [name=" + name + ']', e);
+ else {
+ try {
+ configurationMgr
+ .configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY)
+ .tables()
+ .change(change -> change.create(name, tableInitChange))
+ .get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Table wasn't created [name=" + name + ']', e);
- tblFut.completeExceptionally(e);
+ removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(e));
+ }
}
return tblFut.join();
@@ -523,9 +559,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (!name.equals(tableName))
return false;
- if (e == null) {
+ if (e == null)
tblFut.complete(null);
- }
else
tblFut.completeExceptionally(e);
@@ -555,7 +590,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
@Override public void dropTable(String name) {
CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
- listen(TableEvent.DROP, new EventListener<>() {
+ EventListener<TableEventParameters> clo = new EventListener<>() {
@Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
String tableName = parameters.tableName();
@@ -578,19 +613,28 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
@Override public void remove(@NotNull Throwable e) {
dropTblFut.completeExceptionally(e);
}
- });
+ };
- try {
- configurationMgr
- .configurationRegistry()
- .getConfiguration(TablesConfiguration.KEY)
- .tables()
- .change(change -> change.delete(name)).get();
+ listen(TableEvent.DROP, clo);
+
+ if (!isTableConfigured(name)) {
+ if (dropTblFut.complete(null))
+ removeListener(TableEvent.DROP, clo, null);
}
- catch (InterruptedException | ExecutionException e) {
- LOG.error("Table wasn't dropped [name=" + name + ']', e);
+ else {
+ try {
+ configurationMgr
+ .configurationRegistry()
+ .getConfiguration(TablesConfiguration.KEY)
+ .tables()
+ .change(change -> change.delete(name))
+ .get();
+ }
+ catch (InterruptedException | ExecutionException e) {
+ LOG.error("Table wasn't dropped [name=" + name + ']', e);
- dropTblFut.completeExceptionally(e);
+ removeListener(TableEvent.DROP, clo, new IgniteInternalCheckedException(e));
+ }
}
dropTblFut.join();
@@ -670,12 +714,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
EventListener<TableEventParameters> clo = new EventListener<>() {
@Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
- if (e instanceof ListenerRemovedException) {
- getTblFut.completeExceptionally(e);
-
- return true;
- }
-
String tableName = parameters.tableName();
if (!name.equals(tableName))
@@ -700,7 +738,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
if (tbl != null && getTblFut.complete(tbl) ||
!isTableConfigured(name) && getTblFut.complete(null))
- removeListener(TableEvent.CREATE, clo);
+ removeListener(TableEvent.CREATE, clo, null);
return getTblFut.join();
}
@@ -712,7 +750,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
* @return True if table configured, false otherwise.
*/
private boolean isTableConfigured(String name) {
- return metaStorageMgr.get(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(name) + ".name")).join() != null;
+ return metaStorageMgr.invoke(Conditions.exists(
+ new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(name) + ".name")),
+ Operations.noop(),
+ Operations.noop()
+ ).join();
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 64322d7..75e3e36 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.table;
import java.lang.reflect.Method;
-import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -74,6 +73,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -330,6 +330,33 @@ public class TableManagerTest {
}
/**
+ * Tries to create a table that already exists.
+ */
+ @Test
+ public void testDoubledCreateTable() {
+ CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
+
+ SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME)
+ .columns(
+ SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+ SchemaBuilders.column("val", ColumnType.INT64).asNullable().build())
+ .withPrimaryKey("key")
+ .build();
+
+ Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
+
+ assertNotNull(table);
+
+ assertThrows(RuntimeException.class, () -> tblManagerFut.join().createTable(scmTbl.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl, tblCh)
+ .changeReplicas(1)
+ .changePartitions(10)));
+
+ assertSame(table, tblManagerFut.join().getOrCreateTable(scmTbl.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl, tblCh)
+ .changeReplicas(1)
+ .changePartitions(10)));
+ }
+
+ /**
* Instantiates Table manager and creates a table in it.
*
* @param schemaTable Configuration schema for a table.
@@ -360,6 +387,10 @@ public class TableManagerTest {
CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
+ String keyForCheck = PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name";
+
+ AtomicBoolean tableCreatedFlag = new AtomicBoolean();
+
when(mm.invoke(any(Condition.class), any(Operation.class), any(Operation.class))).thenAnswer(invocation -> {
Condition condition = invocation.getArgument(0);
@@ -367,10 +398,12 @@ public class TableManagerTest {
Method getKeyMethod = ReflectionUtils.findMethod(internalCondition.getClass(), "key").get();
- byte[] metastorageKeyBytes = (byte[])ReflectionUtils.invokeMethod(getKeyMethod, internalCondition);
+ String metastorageKey = new String((byte[])ReflectionUtils.invokeMethod(getKeyMethod, internalCondition));
- tblIdFut.complete(UUID.fromString(new String(metastorageKeyBytes, StandardCharsets.UTF_8)
- .substring(INTERNAL_PREFIX.length())));
+ if (keyForCheck.equals(metastorageKey))
+ return CompletableFuture.completedFuture(tableCreatedFlag.get());
+
+ tblIdFut.complete(UUID.fromString(metastorageKey.substring(INTERNAL_PREFIX.length())));
return CompletableFuture.completedFuture(true);
});
@@ -394,7 +427,7 @@ public class TableManagerTest {
when(am.calculateAssignments(any(), eq(schemaTable.canonicalName()))).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invocation -> {
- EventListener<AffinityEventParameters> affinityClaculatedDelegate = invocation.getArgument(1);
+ EventListener<AffinityEventParameters> affinityCalculatedDelegate = invocation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
@@ -403,7 +436,7 @@ public class TableManagerTest {
assertTrue(tblIdFut.isDone());
- CompletableFuture.supplyAsync(() -> affinityClaculatedDelegate.notify(
+ CompletableFuture.supplyAsync(() -> affinityCalculatedDelegate.notify(
new AffinityEventParameters(tblIdFut.join(), assignment),
null));
@@ -414,9 +447,6 @@ public class TableManagerTest {
tblManagerFut.complete(tableManager);
- when(mm.get(eq(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"))))
- .thenReturn(CompletableFuture.completedFuture(null));
-
when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
Cursor<Entry> cursor = mock(Cursor.class);
@@ -437,8 +467,7 @@ public class TableManagerTest {
if (!createTbl && !dropTbl)
return CompletableFuture.completedFuture(null);
- when(mm.get(eq(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"))))
- .thenAnswer(invocation -> CompletableFuture.completedFuture(createTbl ? mock(Entry.class) : null));
+ tableCreatedFlag.set(createTbl);
when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
AtomicBoolean firstRecord = new AtomicBoolean(createTbl);