You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/07/12 12:13:43 UTC

[ignite-3] branch main updated: IGNITE-15055 Fixed an issue with creation a table that already exists. Fixes #200

This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d7b295e  IGNITE-15055 Fixed an issue with creation a table that already exists. Fixes #200
d7b295e is described below

commit d7b295e60cbbf1f86aa3f2b1e871573204c321df
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Mon Jul 12 15:13:05 2021 +0300

    IGNITE-15055 Fixed an issue with creation a table that already exists. Fixes #200
    
    Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
 .../apache/ignite/table/manager/IgniteTables.java  |  15 ++-
 .../ignite/internal/manager/EventListener.java     |   1 -
 .../internal/manager/ListenerRemovedException.java |  12 +++
 .../apache/ignite/internal/manager/Producer.java   |  15 ++-
 .../java/org/apache/ignite/internal/raft/Loza.java |   4 +-
 .../internal/table/distributed/TableManager.java   | 104 +++++++++++++++------
 .../ignite/internal/table/TableManagerTest.java    |  51 +++++++---
 7 files changed, 154 insertions(+), 48 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
index aab5f3e..cf1342e 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/manager/IgniteTables.java
@@ -30,11 +30,12 @@ import org.apache.ignite.table.Table;
  */
 public interface IgniteTables {
     /**
-     * Creates a cluster table.
+     * Creates a new table with the given {@code name}.
+     * If a table with the same name already exists, an exception will be thrown.
      *
      * @param name Table name.
      * @param tableInitChange Table changer.
-     * @return Table.
+     * @return Newly created table.
      */
     Table createTable(String name, Consumer<TableChange> tableInitChange);
 
@@ -47,7 +48,17 @@ public interface IgniteTables {
     void alterTable(String name, Consumer<TableChange> tableChange);
 
     /**
+     * Creates a new table with the given {@code name} or returns an existing one with the same {@code name}.
+     *
+     * @param name Table name.
+     * @param tableInitChange Table changer.
+     * @return Existing or newly created table.
+     */
+    Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange);
+
+    /**
      * Drops a table with the name specified.
+     * If a table with the specified name does not exist in the cluster, the operation has no effect.
      *
      * @param name Table name.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
index 04e684c..4a07c73 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
@@ -25,7 +25,6 @@ import org.jetbrains.annotations.Nullable;
  * @see Producer#listen(Event, EventListener)
  */
 public interface EventListener<P extends EventParameters> {
-
     /**
      * Notifies the listener about an event.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java
index bff967a..f486c85 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java
@@ -23,4 +23,16 @@ import org.apache.ignite.lang.IgniteInternalCheckedException;
  * The exception notifies a listener when the listener was removed from queue and never receive a notification again.
  */
 public class ListenerRemovedException extends IgniteInternalCheckedException {
+    /**
+     * Default constructor.
+     */
+    public ListenerRemovedException() {
+    }
+
+    /**
+     * @param cause The exception that was a cause which a listener is removed.
+     */
+    public ListenerRemovedException(IgniteInternalCheckedException cause) {
+        super(cause);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
index 296e5a5..fabebfd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -20,6 +20,8 @@ package org.apache.ignite.internal.manager;
 import java.util.Iterator;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Interface which can produce its events.
@@ -48,8 +50,19 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
      * @param closure Closure.
      */
     public void removeListener(T evt, EventListener<P> closure) {
+        removeListener(evt, closure, null);
+    }
+
+    /**
+     * Removes a listener associated with the event.
+     *
+     * @param evt Event.
+     * @param closure Closure.
+     * @param cause The exception that was a cause which a listener is removed.
+     */
+    public void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause) {
         if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure))
-            closure.remove(new ListenerRemovedException());
+            closure.remove(cause == null ? new ListenerRemovedException() : new ListenerRemovedException(cause));
     }
 
     /**
diff --git a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index bfa8066..625153a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.raft;
 
 import java.util.List;
 import java.util.stream.Collectors;
+import org.apache.ignite.internal.raft.server.RaftServer;
+import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 import org.apache.ignite.raft.client.Peer;
@@ -26,8 +28,6 @@ import org.apache.ignite.raft.client.message.RaftClientMessagesFactory;
 import org.apache.ignite.raft.client.service.RaftGroupListener;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
-import org.apache.ignite.internal.raft.server.RaftServer;
-import org.apache.ignite.internal.raft.server.impl.RaftServerImpl;
 
 /**
  * Best raft manager ever since 1982.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index d250fea..d27adcc 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -44,7 +44,6 @@ import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
 import org.apache.ignite.internal.configuration.ConfigurationManager;
 import org.apache.ignite.internal.configuration.util.ConfigurationUtil;
 import org.apache.ignite.internal.manager.EventListener;
-import org.apache.ignite.internal.manager.ListenerRemovedException;
 import org.apache.ignite.internal.manager.Producer;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.client.Conditions;
@@ -65,7 +64,9 @@ import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.lang.ByteArray;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
 import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.LoggerMessageHelper;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.raft.client.service.RaftGroupService;
 import org.apache.ignite.schema.PrimaryIndex;
@@ -476,9 +477,27 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
     /** {@inheritDoc} */
     @Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
+        return createTable(name, tableInitChange, true);
+    }
+
+    /** {@inheritDoc} */
+    @Override public Table getOrCreateTable(String name, Consumer<TableChange> tableInitChange) {
+        return createTable(name, tableInitChange, false);
+    }
+
+    /**
+     * Creates a new table with the specified name or returns an existing table with the same name.
+     *
+     * @param name Table name.
+     * @param tableInitChange Table configuration.
+     * @param exceptionWhenExist If the value is {@code true}, an exception will be thrown when the table already exists,
+     * {@code false} means the existing table will be returned.
+     * @return A table instance.
+     */
+    public Table createTable(String name, Consumer<TableChange> tableInitChange, boolean exceptionWhenExist) {
         CompletableFuture<Table> tblFut = new CompletableFuture<>();
 
-        listen(TableEvent.CREATE, new EventListener<>() {
+        EventListener<TableEventParameters> clo = new EventListener<>() {
             @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
                 String tableName = parameters.tableName();
 
@@ -496,17 +515,34 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             @Override public void remove(@NotNull Throwable e) {
                 tblFut.completeExceptionally(e);
             }
-        });
+        };
 
-        try {
-            configurationMgr.configurationRegistry()
-                .getConfiguration(TablesConfiguration.KEY).tables().change(change ->
-                change.create(name, tableInitChange)).get();
+        listen(TableEvent.CREATE, clo);
+
+        Table tbl = table(name, true);
+
+        if (tbl != null) {
+            if (exceptionWhenExist) {
+                removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(
+                    LoggerMessageHelper.format("Table already exists [name={}]", name)));
+            }
+            else if (tblFut.complete(tbl))
+                removeListener(TableEvent.CREATE, clo);
         }
-        catch (InterruptedException | ExecutionException e) {
-            LOG.error("Table wasn't created [name=" + name + ']', e);
+        else {
+            try {
+                configurationMgr
+                    .configurationRegistry()
+                    .getConfiguration(TablesConfiguration.KEY)
+                    .tables()
+                    .change(change -> change.create(name, tableInitChange))
+                    .get();
+            }
+            catch (InterruptedException | ExecutionException e) {
+                LOG.error("Table wasn't created [name=" + name + ']', e);
 
-            tblFut.completeExceptionally(e);
+                removeListener(TableEvent.CREATE, clo, new IgniteInternalCheckedException(e));
+            }
         }
 
         return tblFut.join();
@@ -523,9 +559,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
                 if (!name.equals(tableName))
                     return false;
 
-                if (e == null) {
+                if (e == null)
                     tblFut.complete(null);
-                }
                 else
                     tblFut.completeExceptionally(e);
 
@@ -555,7 +590,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
     @Override public void dropTable(String name) {
         CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
 
-        listen(TableEvent.DROP, new EventListener<>() {
+        EventListener<TableEventParameters> clo = new EventListener<>() {
             @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
                 String tableName = parameters.tableName();
 
@@ -578,19 +613,28 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
             @Override public void remove(@NotNull Throwable e) {
                 dropTblFut.completeExceptionally(e);
             }
-        });
+        };
 
-        try {
-            configurationMgr
-                .configurationRegistry()
-                .getConfiguration(TablesConfiguration.KEY)
-                .tables()
-                .change(change -> change.delete(name)).get();
+        listen(TableEvent.DROP, clo);
+
+        if (!isTableConfigured(name)) {
+            if (dropTblFut.complete(null))
+                removeListener(TableEvent.DROP, clo, null);
         }
-        catch (InterruptedException | ExecutionException e) {
-            LOG.error("Table wasn't dropped [name=" + name + ']', e);
+        else {
+            try {
+                configurationMgr
+                    .configurationRegistry()
+                    .getConfiguration(TablesConfiguration.KEY)
+                    .tables()
+                    .change(change -> change.delete(name))
+                    .get();
+            }
+            catch (InterruptedException | ExecutionException e) {
+                LOG.error("Table wasn't dropped [name=" + name + ']', e);
 
-            dropTblFut.completeExceptionally(e);
+                removeListener(TableEvent.DROP, clo, new IgniteInternalCheckedException(e));
+            }
         }
 
         dropTblFut.join();
@@ -670,12 +714,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         EventListener<TableEventParameters> clo = new EventListener<>() {
             @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
-                if (e instanceof ListenerRemovedException) {
-                    getTblFut.completeExceptionally(e);
-
-                    return true;
-                }
-
                 String tableName = parameters.tableName();
 
                 if (!name.equals(tableName))
@@ -700,7 +738,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
 
         if (tbl != null && getTblFut.complete(tbl) ||
             !isTableConfigured(name) && getTblFut.complete(null))
-            removeListener(TableEvent.CREATE, clo);
+            removeListener(TableEvent.CREATE, clo, null);
 
         return getTblFut.join();
     }
@@ -712,7 +750,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
      * @return True if table configured, false otherwise.
      */
     private boolean isTableConfigured(String name) {
-        return metaStorageMgr.get(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(name) + ".name")).join() != null;
+        return metaStorageMgr.invoke(Conditions.exists(
+            new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(name) + ".name")),
+            Operations.noop(),
+            Operations.noop()
+        ).join();
     }
 
     /**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 64322d7..75e3e36 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.table;
 
 import java.lang.reflect.Method;
-import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -74,6 +73,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -330,6 +330,33 @@ public class TableManagerTest {
     }
 
     /**
+     * Tries to create a table that already exists.
+     */
+    @Test
+    public void testDoubledCreateTable() {
+        CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
+
+        SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME)
+            .columns(
+                SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+                SchemaBuilders.column("val", ColumnType.INT64).asNullable().build())
+            .withPrimaryKey("key")
+            .build();
+
+        Table table = mockManagersAndCreateTable(scmTbl, tblManagerFut);
+
+        assertNotNull(table);
+
+        assertThrows(RuntimeException.class, () -> tblManagerFut.join().createTable(scmTbl.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl, tblCh)
+            .changeReplicas(1)
+            .changePartitions(10)));
+
+        assertSame(table, tblManagerFut.join().getOrCreateTable(scmTbl.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl, tblCh)
+            .changeReplicas(1)
+            .changePartitions(10)));
+    }
+
+    /**
      * Instantiates Table manager and creates a table in it.
      *
      * @param schemaTable Configuration schema for a table.
@@ -360,6 +387,10 @@ public class TableManagerTest {
 
         CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
 
+        String keyForCheck = PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name";
+
+        AtomicBoolean tableCreatedFlag = new AtomicBoolean();
+
         when(mm.invoke(any(Condition.class), any(Operation.class), any(Operation.class))).thenAnswer(invocation -> {
             Condition condition = invocation.getArgument(0);
 
@@ -367,10 +398,12 @@ public class TableManagerTest {
 
             Method getKeyMethod = ReflectionUtils.findMethod(internalCondition.getClass(), "key").get();
 
-            byte[] metastorageKeyBytes = (byte[])ReflectionUtils.invokeMethod(getKeyMethod, internalCondition);
+            String metastorageKey = new String((byte[])ReflectionUtils.invokeMethod(getKeyMethod, internalCondition));
 
-            tblIdFut.complete(UUID.fromString(new String(metastorageKeyBytes, StandardCharsets.UTF_8)
-                .substring(INTERNAL_PREFIX.length())));
+            if (keyForCheck.equals(metastorageKey))
+                return CompletableFuture.completedFuture(tableCreatedFlag.get());
+
+            tblIdFut.complete(UUID.fromString(metastorageKey.substring(INTERNAL_PREFIX.length())));
 
             return CompletableFuture.completedFuture(true);
         });
@@ -394,7 +427,7 @@ public class TableManagerTest {
         when(am.calculateAssignments(any(), eq(schemaTable.canonicalName()))).thenReturn(CompletableFuture.completedFuture(true));
 
         doAnswer(invocation -> {
-            EventListener<AffinityEventParameters> affinityClaculatedDelegate = invocation.getArgument(1);
+            EventListener<AffinityEventParameters> affinityCalculatedDelegate = invocation.getArgument(1);
 
             ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
 
@@ -403,7 +436,7 @@ public class TableManagerTest {
 
             assertTrue(tblIdFut.isDone());
 
-            CompletableFuture.supplyAsync(() -> affinityClaculatedDelegate.notify(
+            CompletableFuture.supplyAsync(() -> affinityCalculatedDelegate.notify(
                 new AffinityEventParameters(tblIdFut.join(), assignment),
                 null));
 
@@ -414,9 +447,6 @@ public class TableManagerTest {
 
         tblManagerFut.complete(tableManager);
 
-        when(mm.get(eq(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"))))
-            .thenReturn(CompletableFuture.completedFuture(null));
-
         when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
             Cursor<Entry> cursor = mock(Cursor.class);
 
@@ -437,8 +467,7 @@ public class TableManagerTest {
             if (!createTbl && !dropTbl)
                 return CompletableFuture.completedFuture(null);
 
-            when(mm.get(eq(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"))))
-                .thenAnswer(invocation -> CompletableFuture.completedFuture(createTbl ? mock(Entry.class) : null));
+            tableCreatedFlag.set(createTbl);
 
             when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invocation -> {
                 AtomicBoolean firstRecord = new AtomicBoolean(createTbl);