You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/06/11 12:55:30 UTC
[ignite-3] branch main updated: IGNITE-14773 Fixed an issue that
led to IgniteTables#table(String tableName) could return null for the not
fully initialized table. Fixes #149
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 9d631e1 IGNITE-14773 Fixed an issue that led to IgniteTables#table(String tableName) could return null for the not fully initialized table. Fixes #149
9d631e1 is described below
commit 9d631e1f98f231f015ec20efa029d689081be2c7
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Jun 11 15:54:53 2021 +0300
IGNITE-14773 Fixed an issue that led to IgniteTables#table(String tableName) could return null for the not fully initialized table. Fixes #149
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../internal/affinity/AffinityManagerTest.java | 33 ++-
.../main/java/org/apache/ignite/table/Table.java | 8 +
.../ignite/internal/manager/EventListener.java | 47 ++++
.../internal/manager/ListenerRemovedException.java | 26 ++
.../apache/ignite/internal/manager/Producer.java | 24 +-
.../client/ITMetaStorageServiceTest.java | 36 +--
.../server/raft/MetaStorageListener.java | 13 +-
.../internal/metastorage/MetaStorageManager.java | 16 +-
.../runner/app/DynamicTableCreationTest.java | 28 +-
.../ignite/internal/schema/SchemaManager.java | 40 ++-
.../ignite/distributed/ITDistributedTableTest.java | 1 +
.../ignite/internal/table/InternalTable.java | 7 +
.../apache/ignite/internal/table/TableImpl.java | 16 +-
.../internal/table/distributed/TableManager.java | 283 ++++++++++++++++-----
.../distributed/storage/InternalTableImpl.java | 17 +-
.../internal/table/event/TableEventParameters.java | 57 ++---
.../ignite/internal/table/TableManagerTest.java | 227 ++++++++++++++---
.../table/impl/DummyInternalTableImpl.java | 5 +
18 files changed, 647 insertions(+), 237 deletions(-)
diff --git a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
index 09b2064..1c61df3 100644
--- a/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
+++ b/modules/affinity/src/test/java/org/apache/ignite/internal/affinity/AffinityManagerTest.java
@@ -28,18 +28,22 @@ import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.configuration.storage.ConfigurationType;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
+import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
import org.apache.ignite.internal.baseline.BaselineManager;
+import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.apache.ignite.internal.vault.VaultManager;
-import org.apache.ignite.lang.ByteArray;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.internal.metastorage.client.Condition;
import org.apache.ignite.internal.metastorage.client.Entry;
import org.apache.ignite.internal.metastorage.client.EntryEvent;
import org.apache.ignite.internal.metastorage.client.Operation;
import org.apache.ignite.internal.metastorage.client.WatchEvent;
import org.apache.ignite.internal.metastorage.client.WatchListener;
+import org.apache.ignite.internal.util.ByteUtils;
+import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -186,7 +190,15 @@ public class AffinityManagerTest {
CompletableFuture<Boolean> assignmentCalculated = new CompletableFuture<>();
- affinityManager.listen(AffinityEvent.CALCULATED, (parameters, e) -> assignmentCalculated.complete(e == null));
+ affinityManager.listen(AffinityEvent.CALCULATED, new EventListener<AffinityEventParameters>() {
+ @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
+ return assignmentCalculated.complete(e == null);
+ }
+
+ @Override public void remove(@NotNull Throwable e) {
+ assignmentCalculated.completeExceptionally(e);
+ }
+ });
affinityManager.calculateAssignments(tblId, STATIC_TABLE_NAME);
@@ -200,7 +212,6 @@ public class AffinityManagerTest {
public void testRemovedAssignment() {
MetaStorageManager mm = mock(MetaStorageManager.class);
BaselineManager bm = mock(BaselineManager.class);
- VaultManager vm = mock(VaultManager.class);
UUID tblId = UUID.randomUUID();
@@ -242,7 +253,15 @@ public class AffinityManagerTest {
CompletableFuture<Boolean> assignmentRemoved = new CompletableFuture<>();
- affinityManager.listen(AffinityEvent.REMOVED, (parameters, e) -> assignmentRemoved.complete(e == null));
+ affinityManager.listen(AffinityEvent.REMOVED, new EventListener<AffinityEventParameters>() {
+ @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
+ return assignmentRemoved.complete(e == null);
+ }
+
+ @Override public void remove(@NotNull Throwable e) {
+ assignmentRemoved.completeExceptionally(e);
+ }
+ });
affinityManager.removeAssignment(tblId);
diff --git a/modules/api/src/main/java/org/apache/ignite/table/Table.java b/modules/api/src/main/java/org/apache/ignite/table/Table.java
index 80d7273..8c9cf5c 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/Table.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/Table.java
@@ -21,6 +21,7 @@ import org.apache.ignite.table.mapper.KeyMapper;
import org.apache.ignite.table.mapper.Mappers;
import org.apache.ignite.table.mapper.RecordMapper;
import org.apache.ignite.table.mapper.ValueMapper;
+import org.jetbrains.annotations.NotNull;
/**
* Table view of table provides methods to access table records regarding binary object concept.
@@ -39,6 +40,13 @@ import org.apache.ignite.table.mapper.ValueMapper;
*/
public interface Table extends TableView<Tuple> {
/**
+ * Gets a name of the table.
+ *
+ * @return Table name.
+ */
+ @NotNull String tableName();
+
+ /**
* Creates record view of table for record class mapper provided.
*
* @param recMapper Record class mapper.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
new file mode 100644
index 0000000..b0cb9e3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventListener.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.manager;
+
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The listener handles events from a producer.
+ * @see Producer#listen(Event, EventListener)
+ */
+public interface EventListener<P extends EventParameters> {
+
+ /**
+ * Notifies the listener about an event.
+ *
+ * @param parameters Parameters provide a properties of the event. This attribute cannot be {@code null}.
+ * @param exception Exception which is happened during the event produced or {@code null}.
+ * @return True means that the event is handled and a listener will be removed,
+ * false is the listener will stay listen.
+ */
+ boolean notify(@NotNull P parameters, @Nullable Throwable exception);
+
+ /**
+ * Removes an listener from the event producer.
+ * When the listener was removed it never receive a notification any more.
+ *
+ * @param exception An exception which was the reason that the listener was removed.
+ * It cannot be {@code null}.
+ */
+ void remove(@NotNull Throwable exception);
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java
new file mode 100644
index 0000000..bff967a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/ListenerRemovedException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.manager;
+
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
+/**
+ * The exception notifies a listener when the listener was removed from queue and never receive a notification again.
+ */
+public class ListenerRemovedException extends IgniteInternalCheckedException {
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
index f9639af..296e5a5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -20,14 +20,13 @@ package org.apache.ignite.internal.manager;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.function.BiPredicate;
/**
* Interface which can produce its events.
*/
public abstract class Producer<T extends Event, P extends EventParameters> {
/** All listeners. */
- private ConcurrentHashMap<T, ConcurrentLinkedQueue<BiPredicate<P, Throwable>>> listeners = new ConcurrentHashMap<>();
+ private ConcurrentHashMap<T, ConcurrentLinkedQueue<EventListener<P>>> listeners = new ConcurrentHashMap<>();
/**
* Registers an event listener.
@@ -37,12 +36,23 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
* @param evt Event.
* @param closure Closure.
*/
- public void listen(T evt, BiPredicate<P, Throwable> closure) {
+ public void listen(T evt, EventListener<P> closure) {
listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>())
.offer(closure);
}
/**
+ * Removes a listener associated with the event.
+ *
+ * @param evt Event.
+ * @param closure Closure.
+ */
+ public void removeListener(T evt, EventListener<P> closure) {
+ if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure))
+ closure.remove(new ListenerRemovedException());
+ }
+
+ /**
* Notifies every listener that subscribed before.
*
* @param evt Event type.
@@ -50,19 +60,19 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
* @param err Exception when it was happened, or {@code null} otherwise.
*/
protected void onEvent(T evt, P params, Throwable err) {
- ConcurrentLinkedQueue<BiPredicate<P, Throwable>> queue = listeners.get(evt);
+ ConcurrentLinkedQueue<EventListener<P>> queue = listeners.get(evt);
if (queue == null)
return;
- BiPredicate<P, Throwable> closure;
+ EventListener<P> closure;
- Iterator<BiPredicate<P, Throwable>> iter = queue.iterator();
+ Iterator<EventListener<P>> iter = queue.iterator();
while (iter.hasNext()) {
closure = iter.next();
- if (closure.test(params, err))
+ if (closure.notify(params, err))
iter.remove();
}
}
diff --git a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
index 441e6b9..b7fc033 100644
--- a/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
+++ b/modules/metastorage-client/src/integrationTest/java/org/apache/ignite/internal/metastorage/client/ITMetaStorageServiceTest.java
@@ -531,20 +531,17 @@ public class ITMetaStorageServiceTest {
@Override public void close() throws Exception {
-
}
@NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
return it;
}
- @Override
- public boolean hasNext() {
+ @Override public boolean hasNext() {
return it.hasNext();
}
- @Override
- public org.apache.ignite.internal.metastorage.server.Entry next() {
+ @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
return it.next();
}
};
@@ -567,28 +564,23 @@ public class ITMetaStorageServiceTest {
MetaStorageService metaStorageSvc = prepareMetaStorage(
new AbstractKeyValueStorage() {
- @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+ @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo) {
assertArrayEquals(expKeyFrom.bytes(), keyFrom);
assertArrayEquals(expKeyTo.bytes(), keyTo);
- assertEquals(LATEST_REVISION, revUpperBound);
-
return new Cursor<>() {
private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
- @Override
- public boolean hasNext() {
+ @Override public boolean hasNext() {
return false;
}
- @Override
- public org.apache.ignite.internal.metastorage.server.Entry next() {
+ @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
return null;
}
};
@Override public void close() throws Exception {
-
}
@NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
@@ -620,13 +612,11 @@ public class ITMetaStorageServiceTest {
MetaStorageService metaStorageSvc = prepareMetaStorage(
new AbstractKeyValueStorage() {
- @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+ @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo) {
assertArrayEquals(expKeyFrom.bytes(), keyFrom);
assertNull(keyTo);
- assertEquals(LATEST_REVISION, revUpperBound);
-
return new Cursor<>() {
private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
@Override public boolean hasNext() {
@@ -639,7 +629,6 @@ public class ITMetaStorageServiceTest {
};
@Override public void close() throws Exception {
-
}
@NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
@@ -650,8 +639,7 @@ public class ITMetaStorageServiceTest {
return it.hasNext();
}
- @Override
- public org.apache.ignite.internal.metastorage.server.Entry next() {
+ @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
return it.next();
}
};
@@ -672,7 +660,7 @@ public class ITMetaStorageServiceTest {
MetaStorageService metaStorageSvc = prepareMetaStorage(
new AbstractKeyValueStorage() {
- @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+ @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo) {
return new Cursor<>() {
private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
@Override public boolean hasNext() {
@@ -685,7 +673,6 @@ public class ITMetaStorageServiceTest {
};
@Override public void close() throws Exception {
-
}
@NotNull @Override public Iterator<org.apache.ignite.internal.metastorage.server.Entry> iterator() {
@@ -696,8 +683,7 @@ public class ITMetaStorageServiceTest {
return it.hasNext();
}
- @Override
- public org.apache.ignite.internal.metastorage.server.Entry next() {
+ @Override public org.apache.ignite.internal.metastorage.server.Entry next() {
return it.next();
}
};
@@ -718,7 +704,7 @@ public class ITMetaStorageServiceTest {
public void testRangeNext() throws Exception {
MetaStorageService metaStorageSvc = prepareMetaStorage(
new AbstractKeyValueStorage() {
- @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+ @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo) {
return new Cursor<>() {
private final Iterator<org.apache.ignite.internal.metastorage.server.Entry> it = new Iterator<>() {
@Override public boolean hasNext() {
@@ -768,7 +754,7 @@ public class ITMetaStorageServiceTest {
MetaStorageService metaStorageSvc = prepareMetaStorage(
new AbstractKeyValueStorage() {
- @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound) {
+ @Override public Cursor<org.apache.ignite.internal.metastorage.server.Entry> range(byte[] keyFrom, byte[] keyTo) {
return cursorMock;
}
});
diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
index 3a1410f..937ec49 100644
--- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
+++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/raft/MetaStorageListener.java
@@ -221,11 +221,14 @@ public class MetaStorageListener implements RaftGroupListener {
IgniteUuid cursorId = new IgniteUuid(UUID.randomUUID(), 0L);
- Cursor<Entry> cursor = storage.range(
- rangeCmd.keyFrom(),
- rangeCmd.keyTo(),
- rangeCmd.revUpperBound()
- );
+ Cursor<Entry> cursor = (rangeCmd.revUpperBound() != -1) ?
+ storage.range(
+ rangeCmd.keyFrom(),
+ rangeCmd.keyTo(),
+ rangeCmd.revUpperBound()) :
+ storage.range(
+ rangeCmd.keyFrom(),
+ rangeCmd.keyTo());
cursors.put(cursorId, new IgniteBiTuple<>(cursor, CursorType.RANGE));
diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index d56a550..ff0d3c8 100644
--- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -29,7 +29,14 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.internal.ConfigurationManager;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
+import org.apache.ignite.internal.metastorage.client.CompactedException;
+import org.apache.ignite.internal.metastorage.client.Condition;
+import org.apache.ignite.internal.metastorage.client.Entry;
+import org.apache.ignite.internal.metastorage.client.MetaStorageService;
import org.apache.ignite.internal.metastorage.client.MetaStorageServiceImpl;
+import org.apache.ignite.internal.metastorage.client.Operation;
+import org.apache.ignite.internal.metastorage.client.OperationTimeoutException;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
import org.apache.ignite.internal.metastorage.watch.AggregatedWatch;
@@ -43,13 +50,6 @@ import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.internal.metastorage.client.CompactedException;
-import org.apache.ignite.internal.metastorage.client.Condition;
-import org.apache.ignite.internal.metastorage.client.Entry;
-import org.apache.ignite.internal.metastorage.client.MetaStorageService;
-import org.apache.ignite.internal.metastorage.client.Operation;
-import org.apache.ignite.internal.metastorage.client.OperationTimeoutException;
-import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
import org.jetbrains.annotations.NotNull;
@@ -544,7 +544,7 @@ public class MetaStorageManager {
/** {@inheritDoc} */
@Override public void close() throws Exception {
- innerCursorFut.thenCompose(cursor -> {
+ innerCursorFut.thenApply(cursor -> {
try {
cursor.close();
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
index 6c76f04..55afa3e 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/DynamicTableCreationTest.java
@@ -106,14 +106,14 @@ class DynamicTableCreationTest {
);
// Put data on node 1.
- Table tbl1 = waitForTable(schTbl1.canonicalName(), clusterNodes.get(1));
+ Table tbl1 = clusterNodes.get(1).tables().table(schTbl1.canonicalName());
KeyValueBinaryView kvView1 = tbl1.kvView();
tbl1.insert(tbl1.tupleBuilder().set("key", 1L).set("val", 111).build());
kvView1.put(tbl1.tupleBuilder().set("key", 2L).build(), tbl1.tupleBuilder().set("val", 222).build());
// Get data on node 2.
- Table tbl2 = waitForTable(schTbl1.canonicalName(), clusterNodes.get(2));
+ Table tbl2 = clusterNodes.get(2).tables().table(schTbl1.canonicalName());
KeyValueBinaryView kvView2 = tbl2.kvView();
final Tuple keyTuple1 = tbl2.tupleBuilder().set("key", 1L).build();
@@ -131,26 +131,6 @@ class DynamicTableCreationTest {
}
/**
- * Waits for table, until it is initialized.
- *
- * @param tableName Table name
- * @param ign Ignite.
- * @return Table.
- */
- private Table waitForTable(String tableName, Ignite ign) {
- while (ign.tables().table(tableName) == null) {
- try {
- Thread.sleep(100);
- }
- catch (InterruptedException e) {
- LOG.warn("Waiting for table " + tableName + " is interrupted.");
- }
- }
-
- return ign.tables().table(tableName);
- }
-
- /**
* Check dynamic table creation.
*/
@Test
@@ -186,7 +166,7 @@ class DynamicTableCreationTest {
final UUID uuid2 = UUID.randomUUID();
// Put data on node 1.
- Table tbl1 = waitForTable(scmTbl1.canonicalName(), clusterNodes.get(1));
+ Table tbl1 = clusterNodes.get(1).tables().table(scmTbl1.canonicalName());
KeyValueBinaryView kvView1 = tbl1.kvView();
tbl1.insert(tbl1.tupleBuilder().set("key", uuid).set("affKey", 42L)
@@ -196,7 +176,7 @@ class DynamicTableCreationTest {
kvView1.tupleBuilder().set("valStr", "String value 2").set("valInt", 7373).set("valNull", null).build());
// Get data on node 2.
- Table tbl2 = waitForTable(scmTbl1.canonicalName(), clusterNodes.get(2));
+ Table tbl2 = clusterNodes.get(2).tables().table(scmTbl1.canonicalName());
KeyValueBinaryView kvView2 = tbl2.kvView();
final Tuple keyTuple1 = tbl2.tupleBuilder().set("key", uuid).set("affKey", 42L).build();
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
index e25521d..87bb004 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaManager.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.schema;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -29,6 +30,12 @@ import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Conditions;
+import org.apache.ignite.internal.metastorage.client.Entry;
+import org.apache.ignite.internal.metastorage.client.EntryEvent;
+import org.apache.ignite.internal.metastorage.client.Operations;
+import org.apache.ignite.internal.metastorage.client.WatchEvent;
+import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
import org.apache.ignite.internal.schema.event.SchemaEvent;
@@ -39,13 +46,8 @@ import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.internal.metastorage.client.Conditions;
-import org.apache.ignite.internal.metastorage.client.Entry;
-import org.apache.ignite.internal.metastorage.client.EntryEvent;
-import org.apache.ignite.internal.metastorage.client.Operations;
-import org.apache.ignite.internal.metastorage.client.WatchEvent;
-import org.apache.ignite.internal.metastorage.client.WatchListener;
import org.apache.ignite.schema.SchemaTable;
import org.jetbrains.annotations.NotNull;
@@ -251,13 +253,35 @@ public class SchemaManager extends Producer<SchemaEvent, SchemaEventParameters>
Set<ByteArray> keys = new HashSet<>();
- try (Cursor<Entry> cursor = metaStorageMgr.range(new ByteArray(schemaPrefix), null)) {
+ IgniteBiTuple<ByteArray, ByteArray> range = toRange(new ByteArray(schemaPrefix));
+
+ try (Cursor<Entry> cursor = metaStorageMgr.range(range.get1(), range.get2())) {
cursor.forEach(entry -> keys.add(entry.key()));
}
catch (Exception e) {
- LOG.error("Can't remove schemas for the table [tblId=" + tableId + ']');
+ LOG.error("Can't remove schemas for the table [tblId=" + tableId + ']', e);
}
return fut.thenCompose(r -> metaStorageMgr.removeAll(keys)).thenApply(v -> true);
}
+
+ /**
+ * Transforms a prefix bytes to range.
+ * This method should be replaced to direct call of range by prefix
+ * in Meta storage manager when it will be implemented.
+ * TODO: IGNITE-14799
+ *
+ * @param prefixKey Prefix bytes.
+ * @return Tuple with left and right borders for range.
+ */
+ private IgniteBiTuple<ByteArray, ByteArray> toRange(ByteArray prefixKey) {
+ var bytes = Arrays.copyOf(prefixKey.bytes(), prefixKey.bytes().length);
+
+ if (bytes[bytes.length - 1] != Byte.MAX_VALUE)
+ bytes[bytes.length - 1]++;
+ else
+ bytes = Arrays.copyOf(bytes, bytes.length + 1);
+
+ return new IgniteBiTuple<>(prefixKey, new ByteArray(bytes));
+ }
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index 428dd91..cab6b97 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -257,6 +257,7 @@ public class ITDistributedTableTest {
}
Table tbl = new TableImpl(new InternalTableImpl(
+ "tbl",
UUID.randomUUID(),
partMap,
PARTS
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index 4134e33..619fb67 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -36,6 +36,13 @@ public interface InternalTable {
@NotNull UUID tableId();
/**
+ * Gets a name of the table.
+ *
+ * @return Table name.
+ */
+ @NotNull String tableName();
+
+ /**
* Asynchronously gets a row with same key columns values as given one from the table.
*
* @param keyRow Row with key columns set.
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index a7cee35..99c8c8d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -22,13 +22,14 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.KeyValueView;
@@ -61,12 +62,17 @@ public class TableImpl extends AbstractTableView implements Table {
}
/**
- * Gets an internal table associated with the table.
+ * Gets a table id.
*
- * @return Internal table.
+ * @return Table id as UUID.
*/
- public @NotNull InternalTable internalTable() {
- return tbl;
+ public @NotNull UUID tableId() {
+ return tbl.tableId();
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull String tableName() {
+ return tbl.tableName();
}
/**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 4291047..a26873d 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.table.distributed;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,17 +30,22 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.function.BiPredicate;
import java.util.function.Consumer;
import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.internal.util.ConfigurationUtil;
import org.apache.ignite.configuration.schemas.table.TableChange;
import org.apache.ignite.configuration.schemas.table.TableView;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
import org.apache.ignite.internal.affinity.AffinityManager;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
+import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.manager.ListenerRemovedException;
import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Conditions;
+import org.apache.ignite.internal.metastorage.client.Entry;
+import org.apache.ignite.internal.metastorage.client.Operations;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -50,16 +56,17 @@ import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.table.event.TableEventParameters;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.internal.metastorage.client.Conditions;
-import org.apache.ignite.internal.metastorage.client.Operations;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.manager.IgniteTables;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
* Table manager.
@@ -71,6 +78,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** Internal prefix for the metasorage. */
private static final String INTERNAL_PREFIX = "internal.tables.";
+ /** Public prefix for metastorage. */
+ private static final String PUBLIC_PREFIX = "dst-cfg.table.tables.";
+
/** Meta storage service. */
private final MetaStorageManager metaStorageMgr;
@@ -142,16 +152,13 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
));
}
- InternalTableImpl internalTable = new InternalTableImpl(tblId, partitionMap, partitions);
+ InternalTableImpl internalTable = new InternalTableImpl(name, tblId, partitionMap, partitions);
+
+ var table = new TableImpl(internalTable, schemaReg);
- tables.put(name, new TableImpl(internalTable, schemaReg));
+ tables.put(name, table);
- onEvent(TableEvent.CREATE, new TableEventParameters(
- tblId,
- name,
- schemaReg,
- internalTable
- ), null);
+ onEvent(TableEvent.CREATE, new TableEventParameters(table), null);
}
/**
@@ -171,12 +178,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
assert table != null : "There is no table with the name specified [name=" + name + ']';
- onEvent(TableEvent.DROP, new TableEventParameters(
- tblId,
- name,
- table.schemaView(),
- table.internalTable()
- ), null);
+ onEvent(TableEvent.DROP, new TableEventParameters(table), null);
}
/**
@@ -229,12 +231,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
.exceptionally(e -> {
LOG.error("Failed to create a new table [name=" + tblName + ", id=" + tblId + ']', e);
- onEvent(TableEvent.CREATE, new TableEventParameters(
- tblId,
- tblName,
- null,
- null
- ), e);
+ onEvent(TableEvent.CREATE, new TableEventParameters(tblId, tblName), e);
return null;
})
@@ -245,28 +242,40 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
schemaReadyFut.join().schemaRegistry()
));
- affMgr.listen(AffinityEvent.CALCULATED, (parameters, e) -> {
- if (!tblId.equals(parameters.tableId()))
- return false;
+ affMgr.listen(AffinityEvent.CALCULATED, new EventListener<AffinityEventParameters>() {
+ @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
+ if (!tblId.equals(parameters.tableId()))
+ return false;
- if (e == null)
- affinityReadyFut.complete(parameters);
- else
- affinityReadyFut.completeExceptionally(e);
+ if (e == null)
+ affinityReadyFut.complete(parameters);
+ else
+ affinityReadyFut.completeExceptionally(e);
- return true;
+ return true;
+ }
+
+ @Override public void remove(@NotNull Throwable e) {
+ affinityReadyFut.completeExceptionally(e);
+ }
});
- schemaMgr.listen(SchemaEvent.INITIALIZED, (parameters, e) -> {
- if (!tblId.equals(parameters.tableId()))
- return false;
+ schemaMgr.listen(SchemaEvent.INITIALIZED, new EventListener<SchemaEventParameters>() {
+ @Override public boolean notify(@NotNull SchemaEventParameters parameters, @Nullable Throwable e) {
+ if (!tblId.equals(parameters.tableId()))
+ return false;
- if (e == null)
- schemaReadyFut.complete(parameters);
- else
- schemaReadyFut.completeExceptionally(e);
+ if (e == null)
+ schemaReadyFut.complete(parameters);
+ else
+ schemaReadyFut.completeExceptionally(e);
- return true;
+ return true;
+ }
+
+ @Override public void remove(@NotNull Throwable e) {
+ schemaReadyFut.completeExceptionally(e);
+ }
});
}
@@ -278,7 +287,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
for (String tblName : tablesToStop) {
TableImpl t = tables.get(tblName);
- UUID tblId = t.internalTable().tableId();
+ UUID tblId = t.tableId();
if (hasMetastorageLocally) {
var key = new ByteArray(INTERNAL_PREFIX + tblId);
@@ -291,24 +300,22 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
Operations.noop())));
}
- affMgr.listen(AffinityEvent.REMOVED, (parameters, e) -> {
- if (!tblId.equals(parameters.tableId()))
- return false;
-
- if (e == null)
- dropTableLocally(tblName, tblId, parameters.assignment());
- else {
- LOG.error("Failed to drop a table [name=" + tblName + ", id=" + tblId + ']', e);
-
- onEvent(TableEvent.DROP, new TableEventParameters(
- tblId,
- tblName,
- null,
- null
- ), e);
+ affMgr.listen(AffinityEvent.REMOVED, new EventListener<AffinityEventParameters>() {
+ @Override public boolean notify(@NotNull AffinityEventParameters parameters, @Nullable Throwable e) {
+ if (!tblId.equals(parameters.tableId()))
+ return false;
+
+ if (e == null)
+ dropTableLocally(tblName, tblId, parameters.assignment());
+ else
+ onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
+
+ return true;
}
- return true;
+ @Override public void remove(@NotNull Throwable e) {
+ onEvent(TableEvent.DROP, new TableEventParameters(tblId, tblName), e);
+ }
});
}
@@ -320,19 +327,24 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
@Override public Table createTable(String name, Consumer<TableChange> tableInitChange) {
CompletableFuture<Table> tblFut = new CompletableFuture<>();
- listen(TableEvent.CREATE, (params, e) -> {
- String tableName = params.tableName();
+ listen(TableEvent.CREATE, new EventListener<TableEventParameters>() {
+ @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
+ String tableName = parameters.tableName();
- if (!name.equals(tableName))
- return false;
+ if (!name.equals(tableName))
+ return false;
- if (e == null) {
- tblFut.complete(tables.get(name));
+ if (e == null)
+ tblFut.complete(parameters.table());
+ else
+ tblFut.completeExceptionally(e);
+
+ return true;
}
- else
- tblFut.completeExceptionally(e);
- return true;
+ @Override public void remove(@NotNull Throwable e) {
+ tblFut.completeExceptionally(e);
+ }
});
try {
@@ -353,9 +365,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
@Override public void dropTable(String name) {
CompletableFuture<Void> dropTblFut = new CompletableFuture<>();
- listen(TableEvent.DROP, new BiPredicate<>() {
- @Override public boolean test(TableEventParameters params, Throwable e) {
- String tableName = params.tableName();
+ listen(TableEvent.DROP, new EventListener<TableEventParameters>() {
+ @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
+ String tableName = parameters.tableName();
if (!name.equals(tableName))
return false;
@@ -372,6 +384,10 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return true;
}
+
+ @Override public void remove(@NotNull Throwable e) {
+ dropTblFut.completeExceptionally(e);
+ }
});
try {
@@ -392,11 +408,138 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
/** {@inheritDoc} */
@Override public List<Table> tables() {
- return new ArrayList<>(tables.values());
+ ArrayList<Table> tables = new ArrayList<>();
+
+ for (String tblName : tableNamesConfigured()) {
+ Table tbl = table(tblName, false);
+
+ if (tbl != null)
+ tables.add(tbl);
+ }
+
+ return tables;
+ }
+
+ /**
+ * Collects a set of table names from the distributed configuration storage.
+ *
+ * @return A set of table names.
+ */
+ private HashSet<String> tableNamesConfigured() {
+ IgniteBiTuple<ByteArray, ByteArray> range = toRange(new ByteArray(PUBLIC_PREFIX));
+
+ HashSet tableNames = new HashSet();
+
+ try (Cursor<Entry> cursor = metaStorageMgr.range(range.get1(), range.get2())) {
+ while (cursor.hasNext()) {
+ Entry entry = cursor.next();
+
+ String keyTail = entry.key().toString().substring(PUBLIC_PREFIX.length());
+
+ int idx = -1;
+
+ while ((idx = keyTail.indexOf('.', idx + 1)) > 0 && keyTail.charAt(idx - 1) == '\\');
+
+ String tablName = keyTail.substring(0, idx);
+
+ tableNames.add(ConfigurationUtil.unescape(tablName));
+ }
+ }
+ catch (Exception e) {
+ LOG.error("Can't get table names.", e);
+ }
+
+ return tableNames;
}
/** {@inheritDoc} */
@Override public Table table(String name) {
- return tables.get(name);
+ return table(name, true);
+ }
+
+ /**
+ * Gets a table if it exists or {@code null} if it was not created or was removed before.
+ *
+ * @param name Table name.
+ * @param checkConfiguration True when the method checks a configuration before tries to get a table,
+ * false otherwise.
+ * @return A table or {@code null} if table does not exist.
+ */
+ private Table table(String name, boolean checkConfiguration) {
+ if (checkConfiguration && !isTableConfigured(name))
+ return null;
+
+ Table tbl = tables.get(name);
+
+ if (tbl != null)
+ return tbl;
+
+ CompletableFuture<Table> getTblFut = new CompletableFuture<>();
+
+ EventListener<TableEventParameters> clo = new EventListener<TableEventParameters>() {
+ @Override public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable e) {
+ if (e instanceof ListenerRemovedException) {
+ getTblFut.completeExceptionally(e);
+
+ return true;
+ }
+
+ String tableName = parameters.tableName();
+
+ if (!name.equals(tableName))
+ return false;
+
+ if (e == null)
+ getTblFut.complete(parameters.table());
+ else
+ getTblFut.completeExceptionally(e);
+
+ return true;
+ }
+
+ @Override public void remove(@NotNull Throwable e) {
+ getTblFut.completeExceptionally(e);
+ }
+ };
+
+ listen(TableEvent.CREATE, clo);
+
+ tbl = tables.get(name);
+
+ if (tbl != null && getTblFut.complete(tbl) ||
+ !isTableConfigured(name) && getTblFut.complete(null))
+ removeListener(TableEvent.CREATE, clo);
+
+ return getTblFut.join();
+ }
+
+ /**
+ * Checks that the table is configured.
+ *
+ * @param name Table name.
+ * @return True if table configured, false otherwise.
+ */
+ private boolean isTableConfigured(String name) {
+ return metaStorageMgr.get(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(name) + ".name")).join() != null;
+ }
+
+ /**
+ * Transforms a prefix bytes to range.
+ * This method should be replaced to direct call of range by prefix
+ * in Meta storage manager when it will be implemented.
+ * TODO: IGNITE-14799
+ *
+ * @param prefixKey Prefix bytes.
+ * @return Tuple with left and right borders for range.
+ */
+ private IgniteBiTuple<ByteArray, ByteArray> toRange(ByteArray prefixKey) {
+ var bytes = Arrays.copyOf(prefixKey.bytes(), prefixKey.bytes().length);
+
+ if (bytes[bytes.length - 1] != Byte.MAX_VALUE)
+ bytes[bytes.length - 1]++;
+ else
+ bytes = Arrays.copyOf(bytes, bytes.length + 1);
+
+ return new IgniteBiTuple<>(prefixKey, new ByteArray(bytes));
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 8b5213c..13afd67 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -56,34 +56,41 @@ public class InternalTableImpl implements InternalTable {
/** Partitions. */
private int partitions;
+ /** Table name. */
+ private String tableName;
+
/** Table identifier. */
private UUID tableId;
/**
+ * @param tableName Table name.
* @param tableId Table id.
* @param partMap Map partition id to raft group.
* @param partitions Partitions.
*/
public InternalTableImpl(
+ String tableName,
UUID tableId,
Map<Integer, RaftGroupService> partMap,
int partitions
) {
+ this.tableName = tableName;
this.tableId = tableId;
this.partitionMap = partMap;
this.partitions = partitions;
}
- /**
- * Gets a table id.
- *
- * @return Table id as UUID.
- */
+ /** {@inheritDoc} */
@Override public @NotNull UUID tableId() {
return tableId;
}
/** {@inheritDoc} */
+ @Override public String tableName() {
+ return tableName;
+ }
+
+ /** {@inheritDoc} */
@Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
return partitionMap.get(partId(keyRow)).<SingleRowResponse>run(new GetCommand(keyRow))
.thenApply(SingleRowResponse::getValue);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
index 4e0a750..e35068e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
@@ -19,12 +19,11 @@ package org.apache.ignite.internal.table.event;
import java.util.UUID;
import org.apache.ignite.internal.manager.EventParameters;
-import org.apache.ignite.internal.table.InternalTable;
-import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.Table;
/**
- * Table event parameters.
- * There are properties which associate with a concrete table.
+ * Table event parameters. There are properties which associate with a concrete table.
*/
public class TableEventParameters implements EventParameters {
/** Table identifier. */
@@ -33,28 +32,33 @@ public class TableEventParameters implements EventParameters {
/** Table name. */
private final String tableName;
- /** Table schema view. */
- private final SchemaRegistry schemaRegistry;
+ /** Table instance. */
+ private final TableImpl table;
- /** Internal table. */
- private final InternalTable internalTable;
+ /**
+ * @param table Table instance.
+ */
+ public TableEventParameters(TableImpl table) {
+ this(table.tableId(), table.tableName(), table);
+ }
+
+ /**
+ * @param tableId Table identifier.
+ * @param tableName Table name.
+ */
+ public TableEventParameters(UUID tableId, String tableName) {
+ this(tableId, tableName, null);
+ }
/**
* @param tableId Table identifier.
* @param tableName Table name.
- * @param schemaRegistry Table schema view.
- * @param internalTable Internal table.
+ * @param table Table instance.
*/
- public TableEventParameters(
- UUID tableId,
- String tableName,
- SchemaRegistry schemaRegistry,
- InternalTable internalTable
- ) {
+ public TableEventParameters(UUID tableId, String tableName, TableImpl table) {
this.tableId = tableId;
this.tableName = tableName;
- this.schemaRegistry = schemaRegistry;
- this.internalTable = internalTable;
+ this.table = table;
}
/**
@@ -76,20 +80,11 @@ public class TableEventParameters implements EventParameters {
}
/**
- * Gets a schema view for the table.
- *
- * @return Schema descriptor.
- */
- public SchemaRegistry tableSchemaView() {
- return schemaRegistry;
- }
-
- /**
- * Gets an internal table associated with the table.
+ * Gets a table instance associated with the event.
*
- * @return Internal table.
+ * @return Table.
*/
- public InternalTable internalTable() {
- return internalTable;
+ public Table table() {
+ return table;
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 24b6ea1..27b06b5 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -21,13 +21,16 @@ import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.function.BiPredicate;
+import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.configuration.RootKey;
import org.apache.ignite.configuration.internal.ConfigurationManager;
+import org.apache.ignite.configuration.internal.util.ConfigurationUtil;
import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
import org.apache.ignite.configuration.schemas.runner.NodeConfiguration;
import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
@@ -35,7 +38,11 @@ import org.apache.ignite.configuration.storage.ConfigurationType;
import org.apache.ignite.internal.affinity.AffinityManager;
import org.apache.ignite.internal.affinity.event.AffinityEvent;
import org.apache.ignite.internal.affinity.event.AffinityEventParameters;
+import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
+import org.apache.ignite.internal.metastorage.client.Condition;
+import org.apache.ignite.internal.metastorage.client.Entry;
+import org.apache.ignite.internal.metastorage.client.Operation;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.SchemaRegistry;
@@ -43,23 +50,26 @@ import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConver
import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.vault.VaultManager;
+import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.internal.metastorage.client.Condition;
-import org.apache.ignite.internal.metastorage.client.Operation;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.schema.ColumnType;
import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.SchemaTable;
import org.apache.ignite.table.Table;
+import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.platform.commons.util.ReflectionUtils;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -79,6 +89,9 @@ public class TableManagerTest {
/** Internal prefix for the metasorage. */
private static final String INTERNAL_PREFIX = "internal.tables.";
+ /** Public prefix for metastorage. */
+ private static final String PUBLIC_PREFIX = "dst-cfg.table.tables.";
+
/** The name of the table which is statically configured. */
private static final String STATIC_TABLE_NAME = "t1";
@@ -194,11 +207,18 @@ public class TableManagerTest {
ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
- CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
+ CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
- TableManager tableManager = mockManagersAndCreateTable(DYNAMIC_TABLE_NAME, mm, sm, am, rm, vm, node, tblIdFut);
+ SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_NAME).columns(
+ SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+ SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
+ ).withPrimaryKey("key").build();
- assertNotNull(tableManager.table("PUBLIC." + DYNAMIC_TABLE_NAME));
+ Table table = mockManagersAndCreateTable(scmTbl, mm, sm, am, rm, vm, node, tblManagerFut);
+
+ assertNotNull(table);
+
+ assertSame(table, tblManagerFut.join().table(scmTbl.canonicalName()));
}
/**
@@ -214,23 +234,26 @@ public class TableManagerTest {
ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
- CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
+ CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
- TableManager tableManager = mockManagersAndCreateTable(DYNAMIC_TABLE_FOR_DROP_NAME, mm, sm, am, rm, vm, node, tblIdFut);
+ SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
+ SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+ SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
+ ).withPrimaryKey("key").build();
- assertNotNull(tableManager.table("PUBLIC." + DYNAMIC_TABLE_FOR_DROP_NAME));
+ TableImpl table = mockManagersAndCreateTable(scmTbl, mm, sm, am, rm, vm, node, tblManagerFut);
+
+ TableManager tableManager = tblManagerFut.join();
when(sm.unregisterSchemas(any())).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invokation -> {
- BiPredicate<SchemaEventParameters, Exception> schemaInitialized = invokation.getArgument(1);
-
- assertTrue(tblIdFut.isDone());
+ EventListener<SchemaEventParameters> schemaInitialized = invokation.getArgument(1);
SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
- CompletableFuture.supplyAsync(() -> schemaInitialized.test(
- new SchemaEventParameters(tblIdFut.join(), schemaRegistry),
+ CompletableFuture.supplyAsync(() -> schemaInitialized.notify(
+ new SchemaEventParameters(table.tableId(), schemaRegistry),
null));
return null;
@@ -239,53 +262,129 @@ public class TableManagerTest {
when(am.removeAssignment(any())).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invokation -> {
- BiPredicate<AffinityEventParameters, Exception> affinityRemovedDelegate = (BiPredicate)invokation.getArgument(1);
+ EventListener<AffinityEventParameters> affinityRemovedDelegate = invokation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
- for (int part = 0; part < PARTITIONS; part++) {
+ for (int part = 0; part < PARTITIONS; part++)
assignment.add(new ArrayList<ClusterNode>(Collections.singleton(node)));
- }
-
- assertTrue(tblIdFut.isDone());
- CompletableFuture.supplyAsync(() -> affinityRemovedDelegate.test(
- new AffinityEventParameters(tblIdFut.join(), assignment),
+ CompletableFuture.supplyAsync(() -> affinityRemovedDelegate.notify(
+ new AffinityEventParameters(table.tableId(), assignment),
null));
return null;
}).when(am).listen(same(AffinityEvent.REMOVED), any());
- tableManager.dropTable("PUBLIC." + DYNAMIC_TABLE_FOR_DROP_NAME);
+ tableManager.dropTable(scmTbl.canonicalName());
+
+ assertNull(tableManager.table(scmTbl.canonicalName()));
+ }
+
+ /**
+ * Instantiates a table and prepares Table manager.
+ */
+ @Test
+ public void testGetTableDuringCreation() throws Exception {
+ MetaStorageManager mm = mock(MetaStorageManager.class);
+ SchemaManager sm = mock(SchemaManager.class);
+ AffinityManager am = mock(AffinityManager.class);
+ Loza rm = mock(Loza.class);
+ VaultManager vm = mock(VaultManager.class);
+
+ ClusterNode node = new ClusterNode(UUID.randomUUID().toString(), NODE_NAME, "127.0.0.1", PORT);
+
+ CompletableFuture<TableManager> tblManagerFut = new CompletableFuture<>();
+
+ SchemaTable scmTbl = SchemaBuilders.tableBuilder("PUBLIC", DYNAMIC_TABLE_FOR_DROP_NAME).columns(
+ SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
+ SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
+ ).withPrimaryKey("key").build();
+
+ Phaser phaser = new Phaser(2);
+
+ CompletableFuture<Table> createFut = CompletableFuture.supplyAsync(() ->
+ mockManagersAndCreateTableWithDelay(scmTbl, mm, sm, am, rm, vm, node, tblManagerFut, phaser)
+ );
+
+ CompletableFuture<Table> getFut = CompletableFuture.supplyAsync(() -> {
+ phaser.awaitAdvance(0);
+
+ return tblManagerFut.join().table(scmTbl.canonicalName());
+ });
+
+ CompletableFuture<Collection<Table>> getAllTablesFut = CompletableFuture.supplyAsync(() -> {
+ phaser.awaitAdvance(0);
- assertNull(tableManager.table("PUBLIC." + DYNAMIC_TABLE_FOR_DROP_NAME));
+ return tblManagerFut.join().tables();
+ });
+
+ assertFalse(createFut.isDone());
+ assertFalse(getFut.isDone());
+ assertFalse(getAllTablesFut.isDone());
+
+ phaser.arrive();
+
+ assertSame(createFut.join(), getFut.join());
+
+ assertEquals(1, getAllTablesFut.join().size());
}
/**
* Instantiates Table manager and creates a table in it.
*
- * @param tableName Table name.
+ * @param schemaTable Configuration schema for a table.
+ * @param mm Metastorage manager mock.
+ * @param sm Schema manager mock.
+ * @param am Affinity manager mock.
+ * @param rm Raft manager mock.
+ * @param vm Vault manager mock.
+ * @param node This cluster node.
+ * @param tblManagerFut Future for table manager.
+ * @return Table.
+ */
+ private TableImpl mockManagersAndCreateTable(
+ SchemaTable schemaTable,
+ MetaStorageManager mm,
+ SchemaManager sm,
+ AffinityManager am,
+ Loza rm,
+ VaultManager vm,
+ ClusterNode node,
+ CompletableFuture<TableManager> tblManagerFut
+ ) {
+ return mockManagersAndCreateTableWithDelay(schemaTable, mm, sm, am, rm, vm, node, tblManagerFut, null);
+ }
+
+ /**
+ * Instantiates a table and prepares Table manager. When the latch would open, the method completes.
+ *
+ * @param schemaTable Configuration schema for a table.
* @param mm Metastorage manager mock.
* @param sm Schema manager mock.
* @param am Affinity manager mock.
* @param rm Raft manager mock.
* @param vm Vault manager mock.
* @param node This cluster node.
- * @param tblIdFut Future which will determine a table id.
+ * @param tblManagerFut Future for table manager.
+ * @param barrier Phaser for the wait.
* @return Table manager.
*/
- private TableManager mockManagersAndCreateTable(
- String tableName,
+ @NotNull private TableImpl mockManagersAndCreateTableWithDelay(
+ SchemaTable schemaTable,
MetaStorageManager mm,
SchemaManager sm,
AffinityManager am,
Loza rm,
VaultManager vm,
ClusterNode node,
- CompletableFuture<UUID> tblIdFut
+ CompletableFuture<TableManager> tblManagerFut,
+ Phaser phaser
) {
when(mm.hasMetastorageLocally(any())).thenReturn(true);
+ CompletableFuture<UUID> tblIdFut = new CompletableFuture<>();
+
when(mm.invoke((Condition)any(), (Operation)any(), (Operation)any())).thenAnswer(invokation -> {
Condition condition = (Condition)invokation.getArgument(0);
@@ -301,36 +400,35 @@ public class TableManagerTest {
return CompletableFuture.completedFuture(true);
});
- when(sm.initSchemaForTable(any(), eq(tableName))).thenReturn(CompletableFuture.completedFuture(true));
+ when(sm.initSchemaForTable(any(), eq(schemaTable.canonicalName()))).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invokation -> {
- BiPredicate<SchemaEventParameters, Exception> schemaInitialized = invokation.getArgument(1);
+ EventListener<SchemaEventParameters> schemaInitialized = invokation.getArgument(1);
assertTrue(tblIdFut.isDone());
SchemaRegistry schemaRegistry = mock(SchemaRegistry.class);
- CompletableFuture.supplyAsync(() -> schemaInitialized.test(
+ CompletableFuture.supplyAsync(() -> schemaInitialized.notify(
new SchemaEventParameters(tblIdFut.join(), schemaRegistry),
null));
return null;
}).when(sm).listen(same(SchemaEvent.INITIALIZED), any());
- when(am.calculateAssignments(any(), eq(tableName))).thenReturn(CompletableFuture.completedFuture(true));
+ when(am.calculateAssignments(any(), eq(schemaTable.canonicalName()))).thenReturn(CompletableFuture.completedFuture(true));
doAnswer(invokation -> {
- BiPredicate<AffinityEventParameters, Exception> affinityClaculatedDelegate = (BiPredicate)invokation.getArgument(1);
+ EventListener<AffinityEventParameters> affinityClaculatedDelegate = invokation.getArgument(1);
ArrayList<List<ClusterNode>> assignment = new ArrayList<>(PARTITIONS);
- for (int part = 0; part < PARTITIONS; part++) {
+ for (int part = 0; part < PARTITIONS; part++)
assignment.add(new ArrayList<ClusterNode>(Collections.singleton(node)));
- }
assertTrue(tblIdFut.isDone());
- CompletableFuture.supplyAsync(() -> affinityClaculatedDelegate.test(
+ CompletableFuture.supplyAsync(() -> affinityClaculatedDelegate.notify(
new AffinityEventParameters(tblIdFut.join(), assignment),
null));
@@ -339,14 +437,59 @@ public class TableManagerTest {
TableManager tableManager = new TableManager(cfrMgr, mm, sm, am, rm, vm);
+ tblManagerFut.complete(tableManager);
+
+ when(mm.get(eq(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"))))
+ .thenReturn(CompletableFuture.completedFuture(null));
+
+ when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invokation -> {
+ Cursor<Entry> cursor = mock(Cursor.class);
+
+ when(cursor.hasNext()).thenReturn(false);
+
+ return cursor;
+ });
+
int tablesBeforeCreation = tableManager.tables().size();
- SchemaTable scmTbl2 = SchemaBuilders.tableBuilder("PUBLIC", tableName).columns(
- SchemaBuilders.column("key", ColumnType.INT64).asNonNull().build(),
- SchemaBuilders.column("val", ColumnType.INT64).asNullable().build()
- ).withPrimaryKey("key").build();
+ cfrMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY).tables().listen(ctx -> {
+ boolean createTbl = ctx.newValue().get(schemaTable.canonicalName()) != null &&
+ ctx.oldValue().get(schemaTable.canonicalName()) == null;
+
+ boolean dropTbl = ctx.oldValue().get(schemaTable.canonicalName()) != null &&
+ ctx.newValue().get(schemaTable.canonicalName()) == null;
+
+ if (!createTbl && !dropTbl)
+ return CompletableFuture.completedFuture(null);
+
+ when(mm.get(eq(new ByteArray(PUBLIC_PREFIX + ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"))))
+ .thenAnswer(invokation -> CompletableFuture.completedFuture(createTbl ? mock(Entry.class) : null));
+
+ when(mm.range(eq(new ByteArray(PUBLIC_PREFIX)), any())).thenAnswer(invokation -> {
+ AtomicBoolean firstRecord = new AtomicBoolean(createTbl);
+
+ Cursor<Entry> cursor = mock(Cursor.class);
+
+ when(cursor.hasNext()).thenAnswer(hasNextInvokation ->
+ firstRecord.compareAndSet(true, false));
+
+ Entry mockEntry = mock(Entry.class);
+
+ when(mockEntry.key()).thenReturn(new ByteArray(PUBLIC_PREFIX +
+ ConfigurationUtil.escape(schemaTable.canonicalName()) + ".name"));
+
+ when(cursor.next()).thenReturn(mockEntry);
+
+ return cursor;
+ });
+
+ if (phaser != null)
+ phaser.arriveAndAwaitAdvance();
+
+ return CompletableFuture.completedFuture(null);
+ });
- Table tbl2 = tableManager.createTable(scmTbl2.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(scmTbl2, tblCh)
+ TableImpl tbl2 = (TableImpl)tableManager.createTable(schemaTable.canonicalName(), tblCh -> SchemaConfigurationConverter.convert(schemaTable, tblCh)
.changeReplicas(1)
.changePartitions(10)
);
@@ -355,7 +498,7 @@ public class TableManagerTest {
assertEquals(tablesBeforeCreation + 1, tableManager.tables().size());
- return tableManager;
+ return tbl2;
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index 3821f26..ef1a901 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -83,6 +83,11 @@ public class DummyInternalTableImpl implements InternalTable {
}
/** {@inheritDoc} */
+ @Override public @NotNull String tableName() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
@Override public CompletableFuture<BinaryRow> get(@NotNull BinaryRow row) {
assert row != null;