You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ko...@apache.org on 2022/10/27 17:25:57 UTC
[ignite-3] branch ignite-3.0.0-beta1 updated: IGNITE-17859 Update indexes on data modifications
This is an automated email from the ASF dual-hosted git repository.
korlov pushed a commit to branch ignite-3.0.0-beta1
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-3.0.0-beta1 by this push:
new 74ebe4471b IGNITE-17859 Update indexes on data modifications
74ebe4471b is described below
commit 74ebe4471b336dcb67360fd748dd9ddecb2b30df
Author: korlov42 <ko...@gridgain.com>
AuthorDate: Thu Oct 27 20:25:52 2022 +0300
IGNITE-17859 Update indexes on data modifications
---
.../internal/binarytuple/BinaryTupleBuilder.java | 7 +-
.../ignite/client/fakes/FakeIgniteTables.java | 4 +-
.../java/org/apache/ignite/internal/util/Lazy.java | 68 ++
.../org/apache/ignite/internal/util/LazyTest.java | 55 ++
.../internal/testframework/IgniteTestUtils.java | 2 +-
modules/index/build.gradle | 1 +
modules/index/pom.xml | 5 +
.../apache/ignite/internal/index/IndexManager.java | 178 ++++-
.../configuration/IndexConfigurationModule.java | 43 -
...nite.internal.configuration.ConfigurationModule | 17 -
.../ignite/internal/index/IndexManagerTest.java | 13 +-
.../checkpoint/CheckpointTimeoutLockTest.java | 4 +-
.../ignite/internal/index/ItIndexManagerTest.java | 69 +-
.../internal/runner/app/ItDataSchemaSyncTest.java | 84 +-
.../ignite/internal/sql/engine/ItDmlTest.java | 28 +
.../ignite/internal/sqllogic/ItSqlLogicTest.java | 2 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +-
.../ignite/internal/schema/BinaryConverter.java | 30 +-
.../ignite/internal/schema/BinaryTupleSchema.java | 3 +-
.../SchemaDistributedConfigurationModule.java | 6 +-
.../ignite/internal/sql/api/SessionImpl.java | 3 +-
.../internal/sql/engine/prepare/PlannerPhase.java | 4 +-
.../internal/sql/engine/StopCalciteModuleTest.java | 3 +-
.../sql/engine/exec/MockedStructuresTest.java | 2 +-
.../planner/AggregateDistinctPlannerTest.java | 28 +-
.../sql/engine/planner/AggregatePlannerTest.java | 44 +-
.../CorrelatedNestedLoopJoinPlannerTest.java | 2 +
.../sql/engine/planner/HashIndexPlannerTest.java | 3 +
.../engine/planner/JoinColocationPlannerTest.java | 2 +
.../sql/engine/planner/LimitOffsetPlannerTest.java | 2 +
.../sql/engine/planner/MergeJoinPlannerTest.java | 2 +
.../planner/ProjectFilterScanMergePlannerTest.java | 3 +
.../engine/planner/SortAggregatePlannerTest.java | 4 +
.../planner/SortedIndexSpoolPlannerTest.java | 2 +
modules/storage-api/build.gradle | 1 -
modules/storage-api/pom.xml | 6 -
.../storage/impl/TestMvPartitionStorage.java | 5 +
modules/storage-page-memory/build.gradle | 1 -
modules/storage-page-memory/pom.xml | 6 -
modules/storage-rocksdb/build.gradle | 1 -
modules/storage-rocksdb/pom.xml | 6 -
modules/table/build.gradle | 1 -
modules/table/pom.xml | 6 -
.../ItAbstractInternalTableScanTest.java | 33 +-
.../ItInternalTableReadOnlyOperationsTest.java | 94 ++-
.../ignite/distributed/ItTablePersistenceTest.java | 5 +-
.../distributed/ItTxDistributedTestSingleNode.java | 39 +-
.../ignite/internal/table/ItColocationTest.java | 2 +-
.../apache/ignite/internal/table/TableImpl.java | 210 ++++-
.../table/distributed/HashIndexLocker.java | 85 ++
.../internal/table/distributed/IndexLocker.java | 62 ++
.../table/distributed/SortedIndexLocker.java | 113 +++
.../internal/table/distributed/TableManager.java | 37 +-
.../distributed/TableSchemaAwareIndexStorage.java | 90 +++
.../table/distributed/raft/PartitionListener.java | 122 +--
.../replicator/PartitionReplicaListener.java | 866 +++++++++------------
.../distributed/storage/InternalTableImpl.java | 36 +-
.../org/apache/ignite/internal/table/Example.java | 4 +-
.../internal/table/InteropOperationsTest.java | 3 +-
.../table/KeyValueBinaryViewOperationsTest.java | 3 +-
.../KeyValueViewOperationsSimpleSchemaTest.java | 3 +-
.../table/RecordBinaryViewOperationsTest.java | 3 +-
.../internal/table/SchemaValidationTest.java | 3 +-
.../apache/ignite/internal/table/TxLocalTest.java | 4 +-
.../table/distributed/TableManagerTest.java | 1 +
.../raft/PartitionCommandListenerTest.java | 188 +++--
.../PartitionReplicaListenerIndexLockingTest.java | 388 +++++++++
.../replication/PartitionReplicaListenerTest.java | 54 +-
.../table/impl/DummyInternalTableImpl.java | 39 +-
.../java/org/apache/ignite/internal/tx/Lock.java | 6 +
.../org/apache/ignite/internal/tx/LockKey.java | 6 +
71 files changed, 2192 insertions(+), 1065 deletions(-)
diff --git a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
index 087385960d..e94af24a01 100644
--- a/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
+++ b/modules/binary-tuple/src/main/java/org/apache/ignite/internal/binarytuple/BinaryTupleBuilder.java
@@ -814,12 +814,17 @@ public class BinaryTupleBuilder {
/** Put element bytes to the buffer extending it if needed. */
private void putElement(ByteBuffer bytes) {
ensure(bytes.remaining());
+
+ int pos = bytes.position();
+
buffer.put(bytes);
+
+ bytes.position(pos);
}
/** Put element bytes to the buffer extending it if needed. */
private void putElement(ByteBuffer bytes, int offset, int length) {
- assert bytes.limit() <= (offset + length);
+ assert bytes.limit() >= (offset + length);
ensure(length);
buffer.put(bytes.asReadOnlyBuffer().position(offset).limit(offset + length));
}
diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
index 109ad3b1e2..c625175ad0 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeIgniteTables.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.table.Table;
@@ -229,7 +230,8 @@ public class FakeIgniteTables implements IgniteTables, IgniteTablesInternal {
return new TableImpl(
new FakeInternalTable(name, id),
- new FakeSchemaRegistry(history)
+ new FakeSchemaRegistry(history),
+ new HeapLockManager()
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/Lazy.java b/modules/core/src/main/java/org/apache/ignite/internal/util/Lazy.java
new file mode 100644
index 0000000000..e168014049
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/Lazy.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Value which will be initialized at the moment of very first access.
+ *
+ * @param <T> Type of the value.
+ */
+public class Lazy<T> {
+ private static final Supplier<?> EMPTY = () -> {
+ throw new IllegalStateException("Should not be called");
+ };
+
+ private volatile Supplier<T> supplier;
+
+ // This is a safe race, because we follow two simple rules: single read and safe initialization
+ @SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized")
+ private @Nullable T val;
+
+ /**
+ * Creates the lazy value with the given value supplier.
+ *
+ * @param supplier A supplier of the value.
+ */
+ public Lazy(Supplier<@Nullable T> supplier) {
+ this.supplier = supplier;
+ }
+
+ /** Returns the value. */
+ public @Nullable T get() {
+ T v = val;
+
+ if (v == null) {
+ if (supplier != EMPTY) {
+ synchronized (this) {
+ if (supplier != EMPTY) {
+ v = supplier.get();
+ val = v;
+ supplier = (Supplier<T>) EMPTY; // help GC collects objects acquired by supplier's closure
+ }
+ }
+ }
+
+ v = val;
+ }
+
+ return v;
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/LazyTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/LazyTest.java
new file mode 100644
index 0000000000..ac096960ae
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/LazyTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.assertj.core.util.Arrays;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests to validate Lazy value.
+ */
+public class LazyTest {
+ /** Basic test to validate value is computed only once. */
+ @Test
+ public void test() {
+ List<Object> values = Arrays.asList(new Object[]{null, 2, "3"});
+
+ for (Object value : values) {
+ AtomicInteger invocationCounter = new AtomicInteger();
+
+ Lazy<Object> lazy = new Lazy<>(() -> {
+ invocationCounter.incrementAndGet();
+
+ return value;
+ });
+
+ // call get several times to validate value is computed only once
+ assertThat(lazy.get(), equalTo(value));
+ assertThat(lazy.get(), equalTo(value));
+ assertThat(lazy.get(), equalTo(value));
+
+ assertThat(invocationCounter.get(), equalTo(1));
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 9ce1a90f19..329f9161c8 100644
--- a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -408,7 +408,7 @@ public final class IgniteTestUtils {
try {
task.run();
} catch (Throwable e) {
- throw new Exception(e);
+ sneakyThrow(e);
}
return null;
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 1cb16cd745..fc6c32d339 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -25,6 +25,7 @@ dependencies {
implementation project(':ignite-configuration')
implementation project(':ignite-schema')
implementation project(':ignite-transactions')
+ implementation project(':ignite-table')
implementation project(':ignite-configuration')
implementation libs.jetbrains.annotations
testImplementation(testFixtures(project(':ignite-configuration')))
diff --git a/modules/index/pom.xml b/modules/index/pom.xml
index cf8a6ad27b..2588d5a540 100644
--- a/modules/index/pom.xml
+++ b/modules/index/pom.xml
@@ -58,6 +58,11 @@
<artifactId>ignite-transactions</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-table</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index 3cd01dfd95..2ca2fcb2ea 100644
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -18,14 +18,18 @@
package org.apache.ignite.internal.index;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.ArrayUtils.STRING_EMPTY_ARRAY;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
import org.apache.ignite.internal.index.event.IndexEvent;
@@ -34,15 +38,26 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.SchemaManager;
+import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.configuration.ExtendedTableConfiguration;
import org.apache.ignite.internal.schema.configuration.TableConfiguration;
import org.apache.ignite.internal.schema.configuration.TablesConfiguration;
+import org.apache.ignite.internal.schema.configuration.index.HashIndexChange;
import org.apache.ignite.internal.schema.configuration.index.HashIndexView;
import org.apache.ignite.internal.schema.configuration.index.IndexColumnView;
import org.apache.ignite.internal.schema.configuration.index.SortedIndexView;
import org.apache.ignite.internal.schema.configuration.index.TableIndexChange;
import org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
import org.apache.ignite.internal.schema.configuration.index.TableIndexView;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.StringUtils;
import org.apache.ignite.lang.ErrorGroups;
@@ -64,6 +79,10 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
/** Common tables and indexes configuration. */
private final TablesConfiguration tablesCfg;
+ private final SchemaManager schemaManager;
+
+ private final TableManager tableManager;
+
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -74,9 +93,13 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
* Constructor.
*
* @param tablesCfg Tables and indexes configuration.
+ * @param schemaManager Schema manager.
+ * @param tableManager Table manager.
*/
- public IndexManager(TablesConfiguration tablesCfg) {
+ public IndexManager(TablesConfiguration tablesCfg, SchemaManager schemaManager, TableManager tableManager) {
this.tablesCfg = Objects.requireNonNull(tablesCfg, "tablesCfg");
+ this.schemaManager = Objects.requireNonNull(schemaManager, "schemaManager");
+ this.tableManager = tableManager;
}
/** {@inheritDoc} */
@@ -85,6 +108,24 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
LOG.debug("Index manager is about to start");
tablesCfg.indexes().listenElements(new ConfigurationListener());
+ tableManager.listen(TableEvent.CREATE, (param, ex) -> {
+ if (ex != null) {
+ return CompletableFuture.completedFuture(false);
+ }
+
+ List<String> pkColumns = Arrays.stream(param.table().schemaView().schema().keyColumns().columns())
+ .map(Column::name)
+ .collect(Collectors.toList());
+
+ String pkName = param.tableName() + "_PK";
+
+ createIndexAsync("PUBLIC", pkName, param.tableName(), false,
+ change -> change.changeUniq(true).convert(HashIndexChange.class)
+ .changeColumnNames(pkColumns.toArray(STRING_EMPTY_ARRAY))
+ );
+
+ return CompletableFuture.completedFuture(false);
+ });
LOG.info("Index manager started");
}
@@ -123,7 +164,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
Consumer<TableIndexChange> indexChange
) {
if (!busyLock.enterBusy()) {
- return CompletableFuture.failedFuture(new NodeStoppingException());
+ return failedFuture(new NodeStoppingException());
}
LOG.debug("Going to create index [schema={}, table={}, index={}]", schemaName, tableName, indexName);
@@ -160,7 +201,7 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
indexListChange.create(indexName, chg);
}).whenComplete((index, th) -> {
if (th != null) {
- LOG.info("Unable to create index [schema={}, table={}, index={}]",
+ LOG.debug("Unable to create index [schema={}, table={}, index={}]",
th, schemaName, tableName, indexName);
if (!failIfExists && idxExist.get()) {
@@ -284,13 +325,19 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
return failedFuture(new NodeStoppingException());
}
- try {
- fireEvent(IndexEvent.DROP, new IndexEventParameters(evt.storageRevision(), idxId), null);
- } finally {
- busyLock.leaveBusy();
- }
-
- return CompletableFuture.completedFuture(null);
+ return tableManager.tableAsync(evt.oldValue().tableId())
+ .thenAccept(table -> {
+ if (table != null) { // in case of DROP TABLE the table will be removed first
+ table.unregisterIndex(idxId);
+ }
+ })
+ .thenRun(() -> {
+ try {
+ fireEvent(IndexEvent.DROP, new IndexEventParameters(evt.storageRevision(), idxId), null);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ });
}
/**
@@ -331,9 +378,27 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
Index<?> index = newIndex(tableId, tableIndexView);
- fireEvent(IndexEvent.CREATE, new IndexEventParameters(causalityToken, index), null);
+ TableRowToIndexKeyConverter tableRowConverter = new TableRowToIndexKeyConverter(
+ schemaManager.schemaRegistry(tableId),
+ index.descriptor().columns().toArray(STRING_EMPTY_ARRAY)
+ );
- return CompletableFuture.completedFuture(null);
+ return tableManager.tableAsync(tableId)
+ .thenAccept(table -> {
+ if (index instanceof HashIndex) {
+ table.registerHashIndex(tableIndexView.id(), tableIndexView.uniq(), tableRowConverter::convert);
+
+ if (tableIndexView.uniq()) {
+ table.pkId(index.id());
+ }
+ } else if (index instanceof SortedIndex) {
+ table.registerSortedIndex(tableIndexView.id(), tableRowConverter::convert);
+ } else {
+ throw new AssertionError("Unknown index type [type=" + index.getClass() + ']');
+ }
+
+ fireEvent(IndexEvent.CREATE, new IndexEventParameters(causalityToken, index), null);
+ });
}
private Index<?> newIndex(UUID tableId, TableIndexView indexView) {
@@ -380,6 +445,95 @@ public class IndexManager extends Producer<IndexEvent, IndexEventParameters> imp
);
}
+ /**
+ * This class encapsulates the logic of conversion from table row to a particular index key.
+ */
+ private static class TableRowToIndexKeyConverter {
+ private final SchemaRegistry registry;
+ private final String[] indexedColumns;
+ private final Object mutex = new Object();
+
+ private volatile VersionedConverter converter = new VersionedConverter(-1,
+ t -> null);
+
+ TableRowToIndexKeyConverter(SchemaRegistry registry, String[] indexedColumns) {
+ this.registry = registry;
+ this.indexedColumns = indexedColumns;
+ }
+
+ public BinaryTuple convert(BinaryRow tableRow) {
+ VersionedConverter converter = this.converter;
+
+ if (converter.version != tableRow.schemaVersion()) {
+ synchronized (mutex) {
+ if (converter.version != tableRow.schemaVersion()) {
+ converter = createConverter(tableRow.schemaVersion());
+
+ this.converter = converter;
+ }
+ }
+ }
+
+ return converter.convert(tableRow);
+ }
+
+ /** Creates converter for given version of the schema. */
+ private VersionedConverter createConverter(int schemaVersion) {
+ SchemaDescriptor descriptor = registry.schema();
+
+ if (descriptor.version() < schemaVersion) {
+ registry.waitLatestSchema();
+ }
+
+ if (descriptor.version() != schemaVersion) {
+ descriptor = registry.schema(schemaVersion);
+ }
+
+ int[] indexedColumns = resolveColumnIndexes(descriptor);
+
+ BinaryTupleSchema tupleSchema = BinaryTupleSchema.createSchema(descriptor, indexedColumns);
+
+ var rowConverter = new BinaryConverter(descriptor, tupleSchema, false);
+
+ return new VersionedConverter(descriptor.version(),
+ row -> new BinaryTuple(tupleSchema, rowConverter.toTuple(row)));
+ }
+
+ private int[] resolveColumnIndexes(SchemaDescriptor descriptor) {
+ int[] result = new int[indexedColumns.length];
+
+ for (int i = 0; i < indexedColumns.length; i++) {
+ Column column = descriptor.column(indexedColumns[i]);
+
+ assert column != null : indexedColumns[i];
+
+ result[i] = column.schemaIndex();
+ }
+
+ return result;
+ }
+
+ /**
+ * Convenient wrapper which glues together a function which actually converts one row to another,
+ * and a version of the schema the function was build upon.
+ */
+ private static class VersionedConverter {
+ private final int version;
+ private final Function<BinaryRow, BinaryTuple> delegate;
+
+ private VersionedConverter(int version,
+ Function<BinaryRow, BinaryTuple> delegate) {
+ this.version = version;
+ this.delegate = delegate;
+ }
+
+ /** Converts the given row to tuple. */
+ public BinaryTuple convert(BinaryRow binaryRow) {
+ return delegate.apply(binaryRow);
+ }
+ }
+ }
+
private class ConfigurationListener implements ConfigurationNamedListListener<TableIndexView> {
/** {@inheritDoc} */
@Override
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/configuration/IndexConfigurationModule.java b/modules/index/src/main/java/org/apache/ignite/internal/index/configuration/IndexConfigurationModule.java
deleted file mode 100644
index 506d9d4c3d..0000000000
--- a/modules/index/src/main/java/org/apache/ignite/internal/index/configuration/IndexConfigurationModule.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.index.configuration;
-
-import java.util.Collection;
-import java.util.List;
-import org.apache.ignite.configuration.annotation.ConfigurationType;
-import org.apache.ignite.internal.configuration.ConfigurationModule;
-import org.apache.ignite.internal.schema.configuration.index.HashIndexConfigurationSchema;
-import org.apache.ignite.internal.schema.configuration.index.SortedIndexConfigurationSchema;
-
-/**
- * {@link ConfigurationModule} for cluster-wide configuration provided by index.
- */
-public class IndexConfigurationModule implements ConfigurationModule {
- @Override
- public ConfigurationType type() {
- return ConfigurationType.DISTRIBUTED;
- }
-
- @Override
- public Collection<Class<?>> polymorphicSchemaExtensions() {
- return List.of(
- HashIndexConfigurationSchema.class,
- SortedIndexConfigurationSchema.class
- );
- }
-}
diff --git a/modules/index/src/main/resources/META-INF/services/org.apache.ignite.internal.configuration.ConfigurationModule b/modules/index/src/main/resources/META-INF/services/org.apache.ignite.internal.configuration.ConfigurationModule
deleted file mode 100644
index 0443aee5a8..0000000000
--- a/modules/index/src/main/resources/META-INF/services/org.apache.ignite.internal.configuration.ConfigurationModule
+++ /dev/null
@@ -1,17 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-org.apache.ignite.internal.index.configuration.IndexConfigurationModule
diff --git a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
index c8f183c317..f8aeda625d 100644
--- a/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
+++ b/modules/index/src/test/java/org/apache/ignite/internal/index/IndexManagerTest.java
@@ -27,6 +27,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
@@ -48,6 +51,7 @@ import org.apache.ignite.internal.configuration.tree.ConverterToMapVisitor;
import org.apache.ignite.internal.configuration.tree.TraversableTreeNode;
import org.apache.ignite.internal.index.event.IndexEvent;
import org.apache.ignite.internal.index.event.IndexEventParameters;
+import org.apache.ignite.internal.schema.SchemaManager;
import org.apache.ignite.internal.schema.configuration.ExtendedTableChange;
import org.apache.ignite.internal.schema.configuration.ExtendedTableConfigurationSchema;
import org.apache.ignite.internal.schema.configuration.TableChange;
@@ -65,6 +69,8 @@ import org.apache.ignite.internal.schema.configuration.index.SortedIndexChange;
import org.apache.ignite.internal.schema.configuration.index.SortedIndexConfigurationSchema;
import org.apache.ignite.internal.schema.configuration.index.TableIndexConfiguration;
import org.apache.ignite.internal.schema.configuration.storage.UnknownDataStorageConfigurationSchema;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteInternalException;
@@ -117,7 +123,12 @@ public class IndexManagerTest {
tablesConfig = confRegistry.getConfiguration(TablesConfiguration.KEY);
- indexManager = new IndexManager(tablesConfig);
+ TableManager tableManagerMock = mock(TableManager.class);
+
+ when(tableManagerMock.tableAsync(any(UUID.class)))
+ .thenReturn(CompletableFuture.completedFuture(mock(TableImpl.class)));
+
+ indexManager = new IndexManager(tablesConfig, mock(SchemaManager.class), tableManagerMock);
indexManager.start();
tablesConfig.tables().change(tableChange -> tableChange.create("tName", chg -> {
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
index e8687a2bcc..ee92b0955c 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
@@ -161,7 +161,7 @@ public class CheckpointTimeoutLockTest {
ExecutionException exception = assertThrows(ExecutionException.class, () -> readLockFuture1.get(100, MILLISECONDS));
- assertThat(exception.getCause().getCause(), instanceOf(CheckpointReadLockTimeoutException.class));
+ assertThat(exception.getCause(), instanceOf(CheckpointReadLockTimeoutException.class));
} finally {
writeUnlock(readWriteLock0);
writeUnlock(readWriteLock1);
@@ -215,7 +215,7 @@ public class CheckpointTimeoutLockTest {
ExecutionException exception = assertThrows(ExecutionException.class, () -> readLockFuture1.get(100, MILLISECONDS));
- assertThat(exception.getCause().getCause(), instanceOf(CheckpointReadLockTimeoutException.class));
+ assertThat(exception.getCause(), instanceOf(CheckpointReadLockTimeoutException.class));
writeUnlock(readWriteLock0);
writeUnlock(readWriteLock1);
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
index d3516ec41a..eb42a4cf75 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexManagerTest.java
@@ -17,14 +17,14 @@
package org.apache.ignite.internal.index;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.sameInstance;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.index.event.IndexEvent;
@@ -50,43 +50,62 @@ public class ItIndexManagerTest extends AbstractBasicIntegrationTest {
@Test
public void eventsAreFiredWhenIndexesCreatedAndDropped() {
Ignite ignite = CLUSTER_NODES.get(0);
+ IndexManager indexManager = ((IgniteImpl) ignite).indexManager();
+
+ CompletableFuture<IndexEventParameters> pkCreatedFuture = registerListener(indexManager, IndexEvent.CREATE);
sql("CREATE TABLE tname (c1 INT PRIMARY KEY, c2 INT, c3 INT)");
TableImpl table = (TableImpl) ignite.tables().table("tname");
- IndexManager indexManager = ((IgniteImpl) ignite).indexManager();
+ {
+ Index<?> index = await(pkCreatedFuture).index();
- AtomicReference<IndexEventParameters> createEventParamHolder = new AtomicReference<>();
- AtomicReference<IndexEventParameters> dropEventParamHolder = new AtomicReference<>();
+ assertThat(index, notNullValue());
+ assertThat(index.tableId(), equalTo(table.tableId()));
+ assertThat(index.descriptor().columns(), hasItems("C1"));
+ assertThat(index.name(), equalTo("TNAME_PK"));
+ assertThat(index.name(), equalTo(index.descriptor().name()));
+ }
- indexManager.listen(IndexEvent.CREATE, (param, th) -> {
- createEventParamHolder.set(param);
-
- return CompletableFuture.completedFuture(true);
- });
- indexManager.listen(IndexEvent.DROP, (param, th) -> {
- dropEventParamHolder.set(param);
-
- return CompletableFuture.completedFuture(true);
- });
+ CompletableFuture<IndexEventParameters> indexCreatedFuture = registerListener(indexManager, IndexEvent.CREATE);
indexManager.createIndexAsync("PUBLIC", "INAME", "TNAME", true, tableIndexChange ->
tableIndexChange.convert(HashIndexChange.class).changeColumnNames("C3", "C2")).join();
- Index<?> index = createEventParamHolder.get().index();
+ UUID createdIndexId;
+ {
+ Index<?> index = await(indexCreatedFuture).index();
+ createdIndexId = index.id();
- assertThat(index, notNullValue());
- assertThat(index.tableId(), equalTo(table.tableId()));
- assertThat(index.descriptor().columns(), hasItems("C3", "C2"));
- assertThat(index.name(), equalTo("INAME"));
- assertThat(index.name(), equalTo(index.descriptor().name()));
- assertThat(createEventParamHolder.get(), notNullValue());
- assertThat(index, sameInstance(createEventParamHolder.get().index()));
+ assertThat(index, notNullValue());
+ assertThat(index.tableId(), equalTo(table.tableId()));
+ assertThat(index.descriptor().columns(), hasItems("C3", "C2"));
+ assertThat(index.name(), equalTo("INAME"));
+ assertThat(index.name(), equalTo(index.descriptor().name()));
+ }
+
+ CompletableFuture<IndexEventParameters> indexDroppedFuture = registerListener(indexManager, IndexEvent.DROP);
indexManager.dropIndexAsync("PUBLIC", "INAME", true).join();
- assertThat(dropEventParamHolder.get(), notNullValue());
- assertThat(index.id(), sameInstance(dropEventParamHolder.get().indexId()));
+ {
+ IndexEventParameters params = await(indexDroppedFuture);
+
+ assertThat(params, notNullValue());
+ assertThat(params.indexId(), equalTo(createdIndexId));
+ }
+ }
+
+ private CompletableFuture<IndexEventParameters> registerListener(IndexManager indexManager, IndexEvent event) {
+ CompletableFuture<IndexEventParameters> paramFuture = new CompletableFuture<>();
+
+ indexManager.listen(event, (param, th) -> {
+ paramFuture.complete(param);
+
+ return CompletableFuture.completedFuture(true);
+ });
+
+ return paramFuture;
}
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
index 4b92592ef2..299142d228 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItDataSchemaSyncTest.java
@@ -18,17 +18,17 @@
package org.apache.ignite.internal.runner.app;
import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgnitionManager;
import org.apache.ignite.internal.app.IgniteImpl;
@@ -38,11 +38,12 @@ import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.Session;
-import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -50,6 +51,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
* There is a test of table schema synchronization.
*/
@ExtendWith(WorkDirectoryExtension.class)
+@Disabled
public class ItDataSchemaSyncTest extends IgniteAbstractTest {
/**
* Table name.
@@ -166,64 +168,22 @@ public class ItDataSchemaSyncTest extends IgniteAbstractTest {
IgniteTestUtils.waitForCondition(() -> tableOnNode.schemaView().lastSchemaVersion() == 2, 10_000);
}
- TableImpl table1 = (TableImpl) ignite1.tables().table(TABLE_NAME);
-
- for (int i = 10; i < 20; i++) {
- table.recordView().insert(
- null,
- Tuple.create()
- .set("key", (long) i)
- .set("valInt", i)
- .set("valStr", "str_" + i)
- .set("valStr2", "str2_" + i)
- );
- }
-
- CompletableFuture<?> insertFut = IgniteTestUtils.runAsync(() ->
- table1.recordView().insert(
- null,
- Tuple.create()
- .set("key", 0L)
- .set("valInt", 0)
- .set("valStr", "str_" + 0)
- .set("valStr2", "str2_" + 0)
- ));
-
- CompletableFuture<?> getFut = IgniteTestUtils.runAsync(() -> {
- table1.recordView().get(null, Tuple.create().set("key", 10L));
- });
-
- CompletableFuture<?> checkDefaultFut = IgniteTestUtils.runAsync(() -> {
- assertEquals("default",
- table1.recordView().get(null, Tuple.create().set("key", 0L))
- .value("valStr2"));
- });
-
- assertEquals(1, table1.schemaView().lastSchemaVersion());
-
- assertFalse(getFut.isDone());
- assertFalse(insertFut.isDone());
- assertFalse(checkDefaultFut.isDone());
-
- listenerInhibitor.stopInhibit();
-
- getFut.get(10, TimeUnit.SECONDS);
- insertFut.get(10, TimeUnit.SECONDS);
- checkDefaultFut.get(10, TimeUnit.SECONDS);
-
- for (Ignite node : clusterNodes) {
- Table tableOnNode = node.tables().table(TABLE_NAME);
-
- for (int i = 0; i < 20; i++) {
- Tuple row = tableOnNode.recordView().get(null, Tuple.create().set("key", (long) i));
-
- assertNotNull(row);
-
- assertEquals(i, row.intValue("valInt"));
- assertEquals("str_" + i, row.value("valStr"));
- assertEquals(i < 10 ? "default" : ("str2_" + i), row.value("valStr2"));
- }
- }
+ CompletableFuture<?> insertFut = IgniteTestUtils.runAsync(() -> {
+ for (int i = 10; i < 20; i++) {
+ table.recordView().insert(
+ null,
+ Tuple.create()
+ .set("key", (long) i)
+ .set("valInt", i)
+ .set("valStr", "str_" + i)
+ .set("valStr2", "str2_" + i)
+ );
+ }
+ }
+ );
+
+ IgniteException ex = assertThrows(IgniteException.class, () -> await(insertFut));
+ assertThat(ex.getMessage(), containsString("Replication is timed out"));
}
/**
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
index 44320a6bbb..ab5665a40f 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItDmlTest.java
@@ -69,6 +69,34 @@ public class ItDmlTest extends AbstractBasicIntegrationTest {
IgniteTestUtils.setFieldValue(Commons.class, "implicitPkEnabled", null);
}
+ @Test
+ public void pkConstraintConsistencyTest() {
+ sql("CREATE TABLE my (id INT PRIMARY KEY, val INT)");
+ sql("INSERT INTO my VALUES (?, ?)", 0, 1);
+ assertQuery("SELECT val FROM my WHERE id = 0")
+ .returns(1)
+ .check();
+
+ {
+ SqlException ex = assertThrows(SqlException.class, () -> sql("INSERT INTO my VALUES (?, ?)", 0, 2));
+
+ assertEquals(ex.code(), Sql.DUPLICATE_KEYS_ERR);
+ }
+
+ sql("DELETE FROM my WHERE id=?", 0);
+
+ sql("INSERT INTO my VALUES (?, ?)", 0, 2);
+ assertQuery("SELECT val FROM my WHERE id = 0")
+ .returns(2)
+ .check();
+
+ {
+ SqlException ex = assertThrows(SqlException.class, () -> sql("INSERT INTO my VALUES (?, ?)", 0, 3));
+
+ assertEquals(ex.code(), Sql.DUPLICATE_KEYS_ERR);
+ }
+ }
+
@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-16529")
public void mergeOpChangePrimaryKey() {
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
index 60948e2413..3fb955e8c0 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sqllogic/ItSqlLogicTest.java
@@ -57,6 +57,7 @@ import org.apache.ignite.sql.Session;
import org.apache.ignite.table.Table;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DynamicContainer;
import org.junit.jupiter.api.DynamicNode;
import org.junit.jupiter.api.DynamicTest;
@@ -108,6 +109,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
@ExtendWith({WorkDirectoryExtension.class, SystemPropertiesExtension.class})
@WithSystemProperty(key = "IMPLICIT_PK_ENABLED", value = "true")
@SqlLogicTestEnvironment(scriptsRoot = "src/integrationTest/sql")
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-18000")
public class ItSqlLogicTest {
private static final String SQL_LOGIC_TEST_INCLUDE_SLOW = "SQL_LOGIC_TEST_INCLUDE_SLOW";
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 4a9f9e97f3..6e4d0e429d 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -381,7 +381,7 @@ public class IgniteImpl implements Ignite {
clock
);
- indexManager = new IndexManager(tablesConfiguration);
+ indexManager = new IndexManager(tablesConfiguration, schemaManager, distributedTblMgr);
qryEngine = new SqlQueryProcessor(
registry,
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
index ff88006b12..f1a0caf746 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryConverter.java
@@ -45,6 +45,8 @@ public class BinaryConverter {
/** Tuple schema. */
private final BinaryTupleSchema tupleSchema;
+ private final boolean skipKey;
+
/** Row wrapper that allows direct access to variable-length values. */
private class RowHelper extends Row {
RowHelper(SchemaDescriptor descriptor, BinaryRow row) {
@@ -72,10 +74,16 @@ public class BinaryConverter {
*
* @param descriptor Row schema.
* @param tupleSchema Tuple schema.
+ * @param skipKey Whether to build tuple from value part only.
*/
- public BinaryConverter(SchemaDescriptor descriptor, BinaryTupleSchema tupleSchema) {
+ public BinaryConverter(
+ SchemaDescriptor descriptor,
+ BinaryTupleSchema tupleSchema,
+ boolean skipKey
+ ) {
this.descriptor = descriptor;
this.tupleSchema = tupleSchema;
+ this.skipKey = skipKey;
}
/**
@@ -85,7 +93,7 @@ public class BinaryConverter {
* @return Row converter.
*/
public static BinaryConverter forRow(SchemaDescriptor descriptor) {
- return new BinaryConverter(descriptor, BinaryTupleSchema.createRowSchema(descriptor));
+ return new BinaryConverter(descriptor, BinaryTupleSchema.createRowSchema(descriptor), false);
}
/**
@@ -95,7 +103,17 @@ public class BinaryConverter {
* @return Key converter.
*/
public static BinaryConverter forKey(SchemaDescriptor descriptor) {
- return new BinaryConverter(descriptor, BinaryTupleSchema.createKeySchema(descriptor));
+ return new BinaryConverter(descriptor, BinaryTupleSchema.createKeySchema(descriptor), false);
+ }
+
+ /**
+ * Factory method for value converter.
+ *
+ * @param descriptor Row schema.
+ * @return Key converter.
+ */
+ public static BinaryConverter forValue(SchemaDescriptor descriptor) {
+ return new BinaryConverter(descriptor, BinaryTupleSchema.createValueSchema(descriptor), true);
}
/**
@@ -117,6 +135,9 @@ public class BinaryConverter {
NativeTypeSpec typeSpec = elt.typeSpec;
int columnIndex = tupleSchema.columnIndex(elementIndex);
+ if (skipKey) {
+ columnIndex += descriptor.keyColumns().length();
+ }
if (row.hasNullValue(columnIndex, typeSpec)) {
hasNulls = true;
} else if (typeSpec.fixedLength()) {
@@ -134,6 +155,9 @@ public class BinaryConverter {
NativeTypeSpec typeSpec = elt.typeSpec;
int columnIndex = tupleSchema.columnIndex(elementIndex);
+ if (skipKey) {
+ columnIndex += descriptor.keyColumns().length();
+ }
if (row.hasNullValue(columnIndex, typeSpec)) {
builder.appendNull();
continue;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
index 3da88e00f6..4df71bdeca 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryTupleSchema.java
@@ -235,10 +235,11 @@ public class BinaryTupleSchema {
Element[] elements = new Element[columns.length];
boolean hasNullables = false;
+ int idx = 0;
for (int i : columns) {
Column column = descriptor.column(i);
boolean nullable = column.nullable();
- elements[i] = new Element(column.type(), nullable);
+ elements[idx++] = new Element(column.type(), nullable);
hasNullables |= nullable;
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java
index f37861ebfc..272bf79052 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/configuration/SchemaDistributedConfigurationModule.java
@@ -29,8 +29,10 @@ import org.apache.ignite.internal.configuration.ConfigurationModule;
import org.apache.ignite.internal.schema.configuration.defaultvalue.ConstantValueDefaultConfigurationSchema;
import org.apache.ignite.internal.schema.configuration.defaultvalue.FunctionCallDefaultConfigurationSchema;
import org.apache.ignite.internal.schema.configuration.defaultvalue.NullValueDefaultConfigurationSchema;
+import org.apache.ignite.internal.schema.configuration.index.HashIndexConfigurationSchema;
import org.apache.ignite.internal.schema.configuration.index.IndexValidator;
import org.apache.ignite.internal.schema.configuration.index.IndexValidatorImpl;
+import org.apache.ignite.internal.schema.configuration.index.SortedIndexConfigurationSchema;
import org.apache.ignite.internal.schema.configuration.storage.KnownDataStorage;
import org.apache.ignite.internal.schema.configuration.storage.KnownDataStorageValidator;
import org.apache.ignite.internal.schema.configuration.storage.UnknownDataStorageConfigurationSchema;
@@ -68,7 +70,9 @@ public class SchemaDistributedConfigurationModule implements ConfigurationModule
UnknownDataStorageConfigurationSchema.class,
ConstantValueDefaultConfigurationSchema.class,
FunctionCallDefaultConfigurationSchema.class,
- NullValueDefaultConfigurationSchema.class
+ NullValueDefaultConfigurationSchema.class,
+ HashIndexConfigurationSchema.class,
+ SortedIndexConfigurationSchema.class
);
}
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
index 55a89bb227..761b77f14a 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/api/SessionImpl.java
@@ -27,6 +27,7 @@ import it.unimi.dsi.fastutil.longs.LongArrayList;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
@@ -176,7 +177,7 @@ public class SessionImpl implements Session {
);
result.whenComplete((rs, th) -> {
- if (IgniteExceptionUtils.getIgniteErrorCode(th) == SESSION_NOT_FOUND_ERR) {
+ if (Objects.equals(IgniteExceptionUtils.getIgniteErrorCode(th), SESSION_NOT_FOUND_ERR)) {
closeInternal();
}
});
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
index 2026bd50ad..d2af1a6ac5 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/prepare/PlannerPhase.java
@@ -63,7 +63,6 @@ import org.apache.ignite.internal.sql.engine.rule.TableFunctionScanConverterRule
import org.apache.ignite.internal.sql.engine.rule.TableModifyConverterRule;
import org.apache.ignite.internal.sql.engine.rule.UnionConverterRule;
import org.apache.ignite.internal.sql.engine.rule.ValuesConverterRule;
-import org.apache.ignite.internal.sql.engine.rule.logical.ExposeIndexRule;
import org.apache.ignite.internal.sql.engine.rule.logical.FilterScanMergeRule;
import org.apache.ignite.internal.sql.engine.rule.logical.LogicalOrToUnionRule;
import org.apache.ignite.internal.sql.engine.rule.logical.ProjectScanMergeRule;
@@ -189,7 +188,8 @@ public enum PlannerPhase {
b.operand(LogicalSort.class).anyInputs())
.toRule(),
- ExposeIndexRule.INSTANCE,
+ // TODO: uncomment after IGNITE-17748
+ // ExposeIndexRule.INSTANCE,
ProjectScanMergeRule.TABLE_SCAN,
ProjectScanMergeRule.INDEX_SCAN,
FilterSpoolMergeToSortedIndexSpoolRule.INSTANCE,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index f3628ad01e..58fc2974fc 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -72,6 +72,7 @@ import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.NodeStoppingException;
@@ -162,7 +163,7 @@ public class StopCalciteModuleTest {
doAnswer(invocation -> {
EventListener<TableEventParameters> clo = (EventListener<TableEventParameters>) invocation.getArguments()[1];
- clo.notify(new TableEventParameters(0, UUID.randomUUID(), "TEST", new TableImpl(tbl, schemaReg)),
+ clo.notify(new TableEventParameters(0, UUID.randomUUID(), "TEST", new TableImpl(tbl, schemaReg, new HeapLockManager())),
null);
return null;
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index 0cad686300..66bef99e1e 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -247,7 +247,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
tblManager = mockManagers();
- idxManager = new IndexManager(tblsCfg);
+ idxManager = new IndexManager(tblsCfg, schemaManager, tblManager);
idxManager.start();
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregateDistinctPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregateDistinctPlannerTest.java
index a6ac4b64d3..d1785ed9df 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregateDistinctPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregateDistinctPlannerTest.java
@@ -24,17 +24,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.ignite.internal.sql.engine.rel.IgniteAggregate;
-import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapAggregateBase;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
-import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceAggregateBase;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleAggregateBase;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate;
-import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -76,20 +72,22 @@ public class AggregateDistinctPlannerTest extends AbstractAggregatePlannerTest {
assertTrue(nullOrEmpty(rdcAgg.getAggregateCalls()), "Invalid plan\n" + RelOptUtil.toString(phys));
assertTrue(nullOrEmpty(mapAgg.getAggCallList()), "Invalid plan\n" + RelOptUtil.toString(phys));
- if (algo == AggregateAlgorithm.SORT) {
- assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
- }
+ // TODO: uncomment after IGNITE-17748"
+ // if (algo == AggregateAlgorithm.SORT) {
+ // assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
+ // }
}
enum AggregateAlgorithm {
- SORT(
- IgniteSingleSortAggregate.class,
- IgniteMapSortAggregate.class,
- IgniteReduceSortAggregate.class,
- "HashSingleAggregateConverterRule",
- "HashMapReduceAggregateConverterRule",
- "SortSingleAggregateConverterRule"
- ),
+ // TODO: uncomment after IGNITE-17748
+ // SORT(
+ // IgniteSingleSortAggregate.class,
+ // IgniteMapSortAggregate.class,
+ // IgniteReduceSortAggregate.class,
+ // "HashSingleAggregateConverterRule",
+ // "HashMapReduceAggregateConverterRule",
+ // "SortSingleAggregateConverterRule"
+ // ),
HASH(
IgniteSingleHashAggregate.class,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
index dbdc9fc63d..f1498d6fe3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/AggregatePlannerTest.java
@@ -37,18 +37,13 @@ import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.sql.SqlExplainLevel;
import org.apache.calcite.sql.fun.SqlAvgAggFunction;
import org.apache.ignite.internal.sql.engine.rel.IgniteAggregate;
-import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
-import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapAggregateBase;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapHashAggregate;
-import org.apache.ignite.internal.sql.engine.rel.agg.IgniteMapSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceAggregateBase;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceHashAggregate;
-import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleAggregateBase;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate;
-import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
@@ -96,9 +91,10 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
first(agg.getAggCallList()).getAggregation(),
IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
- if (algo == AggregateAlgorithm.SORT) {
- assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
- }
+ // TODO: uncomment after IGNITE-17748
+ // if (algo == AggregateAlgorithm.SORT) {
+ // assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
+ // }
}
/**
@@ -133,9 +129,10 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
first(agg.getAggCallList()).getAggregation(),
IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
- if (algo == AggregateAlgorithm.SORT) {
- assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
- }
+ // TODO: uncomment after IGNITE-17748
+ // if (algo == AggregateAlgorithm.SORT) {
+ // assertNotNull(findFirstNode(phys, byClass(IgniteIndexScan.class)));
+ // }
}
/**
@@ -177,9 +174,10 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
first(mapAgg.getAggCallList()).getAggregation(),
IsInstanceOf.instanceOf(SqlAvgAggFunction.class));
- if (algo == AggregateAlgorithm.SORT) {
- assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
- }
+ // TODO: uncomment after IGNITE-17748
+ // if (algo == AggregateAlgorithm.SORT) {
+ // assertNotNull(findFirstNode(phys, byClass(IgniteSort.class)));
+ // }
}
/**
@@ -339,20 +337,22 @@ public class AggregatePlannerTest extends AbstractAggregatePlannerTest {
private static Stream<Arguments> provideAlgoAndDistribution() {
return Stream.of(
- Arguments.of(AggregateAlgorithm.SORT, IgniteDistributions.broadcast()),
- Arguments.of(AggregateAlgorithm.SORT, IgniteDistributions.random()),
+ // TODO: uncomment after IGNITE-17748
+ // Arguments.of(AggregateAlgorithm.SORT, IgniteDistributions.broadcast()),
+ // Arguments.of(AggregateAlgorithm.SORT, IgniteDistributions.random()),
Arguments.of(AggregateAlgorithm.HASH, IgniteDistributions.broadcast()),
Arguments.of(AggregateAlgorithm.HASH, IgniteDistributions.random())
);
}
enum AggregateAlgorithm {
- SORT(
- IgniteSingleSortAggregate.class,
- IgniteMapSortAggregate.class,
- IgniteReduceSortAggregate.class,
- "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
- ),
+ // TODO: uncomment after IGNITE-17748
+ // SORT(
+ // IgniteSingleSortAggregate.class,
+ // IgniteMapSortAggregate.class,
+ // IgniteReduceSortAggregate.class,
+ // "HashSingleAggregateConverterRule", "HashMapReduceAggregateConverterRule"
+ // ),
HASH(
IgniteSingleHashAggregate.class,
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
index 53bce77d6d..85b4373a1e 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -46,6 +47,7 @@ public class CorrelatedNestedLoopJoinPlannerTest extends AbstractPlannerTest {
* Check equi-join. CorrelatedNestedLoopJoinTest is applicable for it.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void testValidIndexExpressions() throws Exception {
IgniteSchema publicSchema = new IgniteSchema("PUBLIC");
IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
index 112bd4ca89..b636c1b3a9 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexPlannerTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test;
*/
public class HashIndexPlannerTest extends AbstractPlannerTest {
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void hashIndexIsAppliedForEquiCondition() throws Exception {
var indexName = "VAL_HASH_IDX";
@@ -90,6 +92,7 @@ public class HashIndexPlannerTest extends AbstractPlannerTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void hashIndexIsAppliedForComplexConditions() throws Exception {
var indexName = "VAL_HASH_IDX";
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
index 8e26f048ac..a5544491ce 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/JoinColocationPlannerTest.java
@@ -38,11 +38,13 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Test suite to verify join colocation.
*/
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public class JoinColocationPlannerTest extends AbstractPlannerTest {
/**
* Join of the same tables with a simple affinity is expected to be colocated.
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/LimitOffsetPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/LimitOffsetPlannerTest.java
index 2878881803..1454950aa3 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/LimitOffsetPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/LimitOffsetPlannerTest.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
import org.apache.ignite.internal.util.ArrayUtils;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -73,6 +74,7 @@ public class LimitOffsetPlannerTest extends AbstractPlannerTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void testOrderOfRels() throws Exception {
IgniteSchema publicSchema = createSchemaWithTable(IgniteDistributions.random());
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
index c96ce544e7..5341f07477 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/MergeJoinPlannerTest.java
@@ -32,9 +32,11 @@ import org.apache.ignite.internal.sql.engine.rel.IgniteSort;
import org.apache.ignite.internal.sql.engine.rel.IgniteTableScan;
import org.apache.ignite.internal.sql.engine.schema.IgniteSchema;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/** MergeJoin planner test. */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public class MergeJoinPlannerTest extends AbstractPlannerTest {
/** Only MergeJoin encourage. */
private static final String[] DISABLED_RULES = {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ProjectFilterScanMergePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ProjectFilterScanMergePlannerTest.java
index 600d46a7f4..c73a87cad8 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ProjectFilterScanMergePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/ProjectFilterScanMergePlannerTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
@@ -87,6 +88,7 @@ public class ProjectFilterScanMergePlannerTest extends AbstractPlannerTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void testProjectFilterMergeIndex() throws Exception {
// Test project and filter merge into index scan.
TestTable tbl = ((TestTable) publicSchema.getTable("TBL"));
@@ -114,6 +116,7 @@ public class ProjectFilterScanMergePlannerTest extends AbstractPlannerTest {
}
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void testIdentityFilterMergeIndex() throws Exception {
// Test project and filter merge into index scan.
TestTable tbl = ((TestTable) publicSchema.getTable("TBL"));
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortAggregatePlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortAggregatePlannerTest.java
index 7df3ccd755..b47fb488cb 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortAggregatePlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortAggregatePlannerTest.java
@@ -41,6 +41,7 @@ import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
@@ -76,6 +77,7 @@ public class SortAggregatePlannerTest extends AbstractAggregatePlannerTest {
/** Checks if already sorted input exist and involved [Map|Reduce]SortAggregate. */
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void testNoSortAppendingWithCorrectCollation() throws Exception {
RelFieldCollation coll = new RelFieldCollation(1, RelFieldCollation.Direction.DESCENDING);
@@ -111,6 +113,7 @@ public class SortAggregatePlannerTest extends AbstractAggregatePlannerTest {
* @throws Exception If failed.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void collationPermuteSingle() throws Exception {
IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
@@ -159,6 +162,7 @@ public class SortAggregatePlannerTest extends AbstractAggregatePlannerTest {
* @throws Exception If failed.
*/
@Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public void collationPermuteMapReduce() throws Exception {
IgniteTypeFactory f = new IgniteTypeFactory(IgniteTypeSystem.INSTANCE);
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
index cebf23eda7..d5d2c2f257 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
@@ -42,12 +42,14 @@ import org.apache.ignite.internal.sql.engine.trait.IgniteDistributions;
import org.apache.ignite.internal.sql.engine.trait.TraitUtils;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeSystem;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* SortedIndexSpoolPlannerTest.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-17748")
public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
/**
* Check equi-join on not colocated fields. CorrelatedNestedLoopJoinTest is applicable for this case only with IndexSpool.
diff --git a/modules/storage-api/build.gradle b/modules/storage-api/build.gradle
index 0af480340a..2811f42c16 100644
--- a/modules/storage-api/build.gradle
+++ b/modules/storage-api/build.gradle
@@ -32,7 +32,6 @@ dependencies {
testAnnotationProcessor project(":ignite-configuration-annotation-processor")
testImplementation project(':ignite-core')
testImplementation project(':ignite-configuration')
- testImplementation project(':ignite-index')
testImplementation project(':ignite-schema')
testImplementation(testFixtures(project(':ignite-schema')))
testImplementation(testFixtures(project(':ignite-core')))
diff --git a/modules/storage-api/pom.xml b/modules/storage-api/pom.xml
index c51defcc35..8dcd25bde7 100644
--- a/modules/storage-api/pom.xml
+++ b/modules/storage-api/pom.xml
@@ -87,12 +87,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-index</artifactId>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-schema</artifactId>
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 50e27bd297..498010a0ce 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -411,4 +411,9 @@ public class TestMvPartitionStorage implements MvPartitionStorage {
public void close() throws Exception {
// No-op.
}
+
+ /** Removes all entries from this storage. */
+ public void clear() {
+ map.clear();
+ }
}
diff --git a/modules/storage-page-memory/build.gradle b/modules/storage-page-memory/build.gradle
index 62f14d6e73..39f5d50694 100644
--- a/modules/storage-page-memory/build.gradle
+++ b/modules/storage-page-memory/build.gradle
@@ -32,7 +32,6 @@ dependencies {
annotationProcessor project(':ignite-configuration-annotation-processor')
testImplementation project(':ignite-core')
- testImplementation project(':ignite-index')
testImplementation project(':ignite-storage-api')
testImplementation project(':ignite-configuration')
testImplementation project(':ignite-transactions')
diff --git a/modules/storage-page-memory/pom.xml b/modules/storage-page-memory/pom.xml
index bd30d25324..3927fcca57 100644
--- a/modules/storage-page-memory/pom.xml
+++ b/modules/storage-page-memory/pom.xml
@@ -73,12 +73,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-index</artifactId>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
diff --git a/modules/storage-rocksdb/build.gradle b/modules/storage-rocksdb/build.gradle
index c73f6a39c3..5a41f81a29 100644
--- a/modules/storage-rocksdb/build.gradle
+++ b/modules/storage-rocksdb/build.gradle
@@ -33,7 +33,6 @@ dependencies {
testAnnotationProcessor project(':ignite-configuration-annotation-processor')
testImplementation project(':ignite-core')
- testImplementation project(':ignite-index')
testImplementation(testFixtures(project(':ignite-core')))
testImplementation project(':ignite-configuration')
testImplementation(testFixtures(project(':ignite-configuration')))
diff --git a/modules/storage-rocksdb/pom.xml b/modules/storage-rocksdb/pom.xml
index e5b08b84e0..ad6ee1a38e 100644
--- a/modules/storage-rocksdb/pom.xml
+++ b/modules/storage-rocksdb/pom.xml
@@ -73,12 +73,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-index</artifactId>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
diff --git a/modules/table/build.gradle b/modules/table/build.gradle
index 4db79de63b..5cfb473374 100644
--- a/modules/table/build.gradle
+++ b/modules/table/build.gradle
@@ -44,7 +44,6 @@ dependencies {
testImplementation project(':ignite-storage-api')
testImplementation project(':ignite-storage-page-memory')
testImplementation project(':ignite-storage-rocksdb')
- testImplementation project(':ignite-index')
testImplementation project(':ignite-network')
testImplementation project(':ignite-core')
testImplementation project(':ignite-raft')
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index a8543ad226..ac68fcaa34 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -163,12 +163,6 @@
<scope>test</scope>
</dependency>
- <dependency>
- <groupId>org.apache.ignite</groupId>
- <artifactId>ignite-index</artifactId>
- <scope>test</scope>
- </dependency>
-
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-network</artifactId>
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 47218e7f57..155a7e8b9d 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -27,13 +27,12 @@ import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.NoSuchElementException;
+import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
@@ -46,7 +45,10 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
@@ -56,8 +58,6 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.util.ByteUtils;
-import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
@@ -71,6 +71,12 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(MockitoExtension.class)
public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest {
+ private static final SchemaDescriptor ROW_SCHEMA = new SchemaDescriptor(
+ 1,
+ new Column[]{new Column("key", NativeTypes.stringOf(100), false)},
+ new Column[]{new Column("val", NativeTypes.stringOf(100), false)}
+ );
+
/** Mock partition storage. */
@Mock
private MvPartitionStorage mockStorage;
@@ -352,18 +358,13 @@ public abstract class ItAbstractInternalTableScanTest extends IgniteAbstractTest
*
* @param entryKey Key.
* @param entryVal Value
- * @return {@link DataRow} based on given key and value.
- * @throws java.io.IOException If failed to close output stream that was used to convertation.
+ * @return {@link BinaryRow} based on given key and value.
*/
- private static @NotNull BinaryRow prepareRow(@NotNull String entryKey, @NotNull String entryVal) throws IOException {
- byte[] keyBytes = ByteUtils.toBytes(entryKey);
-
- try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
- outputStream.write(keyBytes);
- outputStream.write(ByteUtils.toBytes(entryVal));
-
- return new ByteBufferRow(keyBytes);
- }
+ private static BinaryRow prepareRow(String entryKey, String entryVal) {
+ return new RowAssembler(ROW_SCHEMA, 1, 1)
+ .appendString(Objects.requireNonNull(entryKey, "entryKey"))
+ .appendString(Objects.requireNonNull(entryVal, "entryVal"))
+ .build();
}
/**
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
index 4f3920c9de..9e201b9958 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyOperationsTest.java
@@ -29,12 +29,12 @@ import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
@@ -44,9 +44,9 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.PartitionTimestampCursor;
-import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlySingleRowReplicaRequest;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
@@ -58,7 +58,6 @@ import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.function.Executable;
import org.mockito.Mock;
-import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
/**
@@ -87,6 +86,9 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
@Mock
private InternalTransaction readOnlyTx;
+ @Mock
+ private ReplicaService replicaService;
+
/** Internal table to test. */
private InternalTable internalTbl;
@@ -95,9 +97,7 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
*/
@BeforeEach
public void setUp(TestInfo testInfo) {
- internalTbl = new DummyInternalTableImpl(Mockito.mock(ReplicaService.class), mockStorage);
-
- mockStorage(List.of(ROW_1, ROW_2));
+ internalTbl = new DummyInternalTableImpl(replicaService, mockStorage);
lenient().when(readOnlyTx.isReadOnly()).thenReturn(true);
lenient().when(readOnlyTx.readTimestamp()).thenReturn(CLOCK.now());
@@ -105,27 +105,37 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
@Test
public void testReadOnlyGetNonExistingKeyWithReadTimestamp() {
+ mockReadOnlySingleRowRequest();
+
assertNull(internalTbl.get(createKeyRow(0), CLOCK.now(), mock(ClusterNode.class)).join());
}
@Test
public void testReadOnlyGetNonExistingKeyWithTx() {
+ mockReadOnlySingleRowRequest();
+
assertNull(internalTbl.get(createKeyRow(0), readOnlyTx).join());
}
@Test
public void testReadOnlyGetExistingKeyWithReadTimestamp() {
+ mockReadOnlySingleRowRequest();
+
assertEquals(ROW_2, internalTbl.get(createKeyRow(2), CLOCK.now(), mock(ClusterNode.class)).join());
}
@Test
public void testReadOnlyGetExistingKeyWithTx() {
+ mockReadOnlySingleRowRequest();
+
assertEquals(ROW_2, internalTbl.get(createKeyRow(2), readOnlyTx).join());
}
@Test
public void testReadOnlyGetAllNonExistingKeysWithReadTimestamp() {
+ mockReadOnlyMultiRowRequest();
+
assertEquals(0,
internalTbl.getAll(Collections.singleton(createKeyRow(0)), CLOCK.now(), mock(ClusterNode.class)).join().size()
);
@@ -133,6 +143,8 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
@Test
public void testReadOnlyGetAllNonExistingKeysWithTx() {
+ mockReadOnlyMultiRowRequest();
+
assertEquals(0,
internalTbl.getAll(Collections.singleton(createKeyRow(0)), readOnlyTx).join().size()
);
@@ -140,6 +152,8 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
@Test
public void testReadOnlyGetAllPartiallyExistingKeysWithReadTimestamp() {
+ mockReadOnlyMultiRowRequest();
+
assertEquals(
Collections.singletonList(ROW_2),
internalTbl.getAll(Collections.singleton(createKeyRow(2)), CLOCK.now(), mock(ClusterNode.class)).join()
@@ -148,6 +162,8 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
@Test
public void testReadOnlyGetAllPartiallyExistingKeysWithTx() {
+ mockReadOnlyMultiRowRequest();
+
assertEquals(
Collections.singletonList(ROW_2),
internalTbl.getAll(Collections.singleton(createKeyRow(2)), readOnlyTx).join()
@@ -156,6 +172,8 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
@Test
public void testReadOnlyGetAllExistingKeysWithReadTimestamp() {
+ mockReadOnlyMultiRowRequest();
+
assertEquals(
List.of(ROW_1, ROW_2),
internalTbl.getAll(List.of(createKeyRow(1), createKeyRow(2)), CLOCK.now(), mock(ClusterNode.class)).join()
@@ -164,6 +182,8 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
@Test
public void testReadOnlyGetAllExistingKeysWithTx() {
+ mockReadOnlyMultiRowRequest();
+
assertEquals(
List.of(ROW_1, ROW_2),
internalTbl.getAll(List.of(createKeyRow(1), createKeyRow(2)), readOnlyTx).join()
@@ -206,28 +226,6 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
containsString("Failed to enlist read-write operation into read-only transaction"));
}
- private void mockStorage(List<BinaryRow> submittedItems) {
- // TODO: IGNITE-17859 After index integration get and getAll methods should be used instead of scan.
- AtomicInteger cursorTouchCnt = new AtomicInteger(0);
-
- lenient().when(mockStorage.scan(any(HybridTimestamp.class))).thenAnswer(invocation -> {
- var cursor = mock(PartitionTimestampCursor.class);
-
- lenient().when(cursor.hasNext()).thenAnswer(hnInvocation -> cursorTouchCnt.get() < submittedItems.size());
-
- lenient().when(cursor.next()).thenAnswer(
- ninvocation ->
- ReadResult.createFromCommitted(submittedItems.get(
- cursorTouchCnt.getAndIncrement()),
- new HybridTimestamp(1, 0)
- )
- );
-
- return cursor;
- });
-
- }
-
/**
* Creates a {@link Row} with the supplied key.
*
@@ -257,4 +255,40 @@ public class ItInternalTableReadOnlyOperationsTest extends IgniteAbstractTest {
return new Row(SCHEMA, new ByteBufferRow(rowBuilder.toBytes()));
}
+
+ private void mockReadOnlyMultiRowRequest() {
+ List<BinaryRow> rowStore = List.of(ROW_1, ROW_2);
+
+ when(replicaService.invoke(any(), any(ReadOnlyMultiRowReplicaRequest.class))).thenAnswer(args -> {
+ List<BinaryRow> result = new ArrayList<>();
+
+ for (BinaryRow row : rowStore) {
+ for (BinaryRow searchRow : args.getArgument(1, ReadOnlyMultiRowReplicaRequest.class).binaryRows()) {
+ if (row.keySlice().equals(searchRow.keySlice())) {
+ result.add(row);
+
+ break;
+ }
+ }
+ }
+
+ return CompletableFuture.completedFuture(result);
+ });
+ }
+
+ private void mockReadOnlySingleRowRequest() {
+ List<BinaryRow> rowStore = List.of(ROW_1, ROW_2);
+
+ when(replicaService.invoke(any(), any(ReadOnlySingleRowReplicaRequest.class))).thenAnswer(args -> {
+ for (BinaryRow row : rowStore) {
+ BinaryRow searchRow = args.getArgument(1, ReadOnlySingleRowReplicaRequest.class).binaryRow();
+
+ if (row.keySlice().equals(searchRow.keySlice())) {
+ return CompletableFuture.completedFuture(row);
+ }
+ }
+
+ return CompletableFuture.completedFuture(null);
+ });
+ }
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index 5900c51822..2e7725c550 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -197,7 +197,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
@Override
public BooleanSupplier snapshotCheckClosure(JraftServerImpl restarted, boolean interactedAfterSnapshot) {
MvPartitionStorage storage = getListener(restarted, raftGroupId()).getStorage();
- Map<ByteBuffer, RowId> primaryIndex = getListener(restarted, raftGroupId()).getPk();
+ Map<ByteBuffer, RowId> primaryIndex = null; // getListener(restarted, raftGroupId()).getPk();
Row key = interactedAfterSnapshot ? SECOND_KEY : FIRST_KEY;
Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
@@ -238,7 +238,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
testMpPartStorage,
new TestConcurrentHashMapTxStateStorage(),
txManager,
- new ConcurrentHashMap<>());
+ Map::of
+ );
paths.put(listener, workDir);
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 93bbb5afaf..13e0739c89 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -26,7 +26,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -35,7 +34,6 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -54,13 +52,21 @@ import org.apache.ignite.internal.raft.server.impl.JraftServerImpl;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TxAbstractTest;
+import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
@@ -76,6 +82,7 @@ import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.network.ClusterService;
@@ -323,7 +330,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
Mockito.mock(TxStateTableStorage.class),
startClient() ? clientReplicaSvc : replicaServices.get(localNode),
startClient() ? clientClock : clocks.get(localNode)
- ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA));
+ ), new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), txMgr.lockManager());
this.customers = new TableImpl(new InternalTableImpl(
customersName,
@@ -337,7 +344,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
Mockito.mock(TxStateTableStorage.class),
startClient() ? clientReplicaSvc : replicaServices.get(localNode),
startClient() ? clientClock : clocks.get(localNode)
- ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA));
+ ), new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), txMgr.lockManager());
log.info("Tables have been started");
}
@@ -386,7 +393,22 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
int partId = p;
- ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
+ UUID indexId = UUID.randomUUID();
+
+ BinaryTupleSchema pkSchema = BinaryTupleSchema.create(new Element[]{
+ new Element(NativeTypes.BYTES, false)
+ });
+
+ Function<BinaryRow, BinaryTuple> row2tuple =
+ tableRow -> new BinaryTuple(pkSchema, tableRow.keySlice());
+
+ Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
+ indexId,
+ new TestHashIndexStorage(null),
+ row2tuple
+ ));
+
+ IndexLocker pkLocker = new HashIndexLocker(indexId, true, txManagers.get(node).lockManager(), row2tuple);
CompletableFuture<Void> partitionReadyFuture = raftServers.get(node).prepareRaftGroup(
grpId,
@@ -396,7 +418,7 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
testMpPartStorage,
txSateStorage,
txManagers.get(node),
- primaryIndex
+ () -> Map.of(pkStorage.get().id(), pkStorage.get())
);
},
RaftGroupOptions.defaults()
@@ -412,7 +434,8 @@ public class ItTxDistributedTestSingleNode extends TxAbstractTest {
txManagers.get(node).lockManager(),
partId,
tblId,
- primaryIndex,
+ () -> Map.of(pkLocker.id(), pkLocker),
+ pkStorage,
clocks.get(node),
txSateStorage,
topologyServices.get(node),
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 78a59e7fd2..5b0cc154f3 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -391,7 +391,7 @@ public class ItColocationTest {
schemaRegistry = new DummySchemaManagerImpl(schema);
- tbl = new TableImpl(INT_TABLE, schemaRegistry);
+ tbl = new TableImpl(INT_TABLE, schemaRegistry, new HeapLockManager());
marshaller = new TupleMarshallerImpl(schemaRegistry);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 285a489738..5a58b31484 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -17,15 +17,32 @@
package org.apache.ignite.internal.table;
+import static java.util.concurrent.CompletableFuture.allOf;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.MarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
import org.apache.ignite.internal.schema.marshaller.reflection.KvMarshallerImpl;
import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.KeyValueView;
@@ -34,6 +51,7 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.TestOnly;
/**
* Table view implementation for binary objects.
@@ -42,27 +60,47 @@ public class TableImpl implements Table {
/** Internal table. */
private final InternalTable tbl;
+ private final LockManager lockManager;
+
+ // private final Supplier<List<UUID>> activeIndexIds;
+
/** Schema registry. Should be set either in constructor or via {@link #schemaView(SchemaRegistry)} before start of using the table. */
private volatile SchemaRegistry schemaReg;
+ private final CompletableFuture<UUID> pkId = new CompletableFuture<>();
+
+ private final Map<UUID, CompletableFuture<?>> indexesToWait = new ConcurrentHashMap<>();
+
+ private final Map<UUID, IndexStorageAdapterFactory> indexStorageAdapterFactories = new ConcurrentHashMap<>();
+ private final Map<UUID, IndexLockerFactory> indexLockerFactories = new ConcurrentHashMap<>();
+
/**
* Constructor.
*
* @param tbl The table.
+ * @param lockManager Lock manager.
+ * @param activeIndexIds Supplier of index ids which considered active on the moment of invocation.
*/
- public TableImpl(InternalTable tbl) {
+ public TableImpl(InternalTable tbl, LockManager lockManager, Supplier<List<UUID>> activeIndexIds) {
this.tbl = tbl;
+ this.lockManager = lockManager;
+ // this.activeIndexIds = activeIndexIds;
}
/**
* Constructor.
*
- * @param tbl The table.
+ * @param tbl The table.
* @param schemaReg Table schema registry.
+ * @param lockManager Lock manager.
*/
- public TableImpl(InternalTable tbl, SchemaRegistry schemaReg) {
+ @TestOnly
+ public TableImpl(InternalTable tbl, SchemaRegistry schemaReg, LockManager lockManager) {
this.tbl = tbl;
this.schemaReg = schemaReg;
+ this.lockManager = lockManager;
+
+ // activeIndexIds = List::of;
}
/**
@@ -74,6 +112,20 @@ public class TableImpl implements Table {
return tbl.tableId();
}
+ /**
+ * Provides current table with notion of a primary index.
+ *
+ * @param pkId An identifier of a primary index.
+ */
+ public void pkId(UUID pkId) {
+ this.pkId.complete(Objects.requireNonNull(pkId, "pkId"));
+ }
+
+ /** Returns an identifier of a primary index. */
+ public UUID pkId() {
+ return pkId.join();
+ }
+
/** Returns an internal table instance this view represents. */
public InternalTable internalTable() {
return tbl;
@@ -178,4 +230,156 @@ public class TableImpl implements Table {
public ClusterNode leaderAssignment(int partition) {
return tbl.leaderAssignment(partition);
}
+
+ /** Returns a supplier of index storage wrapper factories for given partition. */
+ public Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexStorageAdapters(int partId) {
+ return () -> {
+ awaitIndexes();
+
+ List<IndexStorageAdapterFactory> factories = new ArrayList<>(indexStorageAdapterFactories.values());
+
+ Map<UUID, TableSchemaAwareIndexStorage> adapters = new HashMap<>();
+
+ for (IndexStorageAdapterFactory factory : factories) {
+ TableSchemaAwareIndexStorage storage = factory.create(partId);
+ adapters.put(storage.id(), storage);
+ }
+
+ return adapters;
+ };
+ }
+
+ /** Returns a supplier of index locker factories for given partition. */
+ public Supplier<Map<UUID, IndexLocker>> indexesLockers(int partId) {
+ return () -> {
+ awaitIndexes();
+
+ List<IndexLockerFactory> factories = new ArrayList<>(indexLockerFactories.values());
+
+ Map<UUID, IndexLocker> lockers = new HashMap<>(factories.size());
+
+ for (IndexLockerFactory factory : factories) {
+ IndexLocker locker = factory.create(partId);
+ lockers.put(locker.id(), locker);
+ }
+
+ return lockers;
+ };
+ }
+
+ /**
+ * Register the index with given id in a table.
+ *
+ * @param indexId An index id os the index to register.
+ * @param unique A flag indicating whether the given index unique or not.
+ * @param searchRowResolver Function which converts given table row to an index key.
+ */
+ public void registerHashIndex(UUID indexId, boolean unique, Function<BinaryRow, BinaryTuple> searchRowResolver) {
+ indexLockerFactories.put(
+ indexId,
+ partitionId -> new HashIndexLocker(
+ indexId,
+ unique,
+ lockManager,
+ searchRowResolver
+ )
+ );
+ indexStorageAdapterFactories.put(
+ indexId,
+ partitionId -> new TableSchemaAwareIndexStorage(
+ indexId,
+ tbl.storage().getOrCreateHashIndex(partitionId, indexId),
+ searchRowResolver
+ )
+ );
+
+ CompletableFuture<?> indexFuture = indexesToWait.remove(indexId);
+
+ if (indexFuture != null) {
+ indexFuture.complete(null);
+ }
+ }
+
+ /**
+ * Register the index with given id in a table.
+ *
+ * @param indexId An index id os the index to register.
+ * @param searchRowResolver Function which converts given table row to an index key.
+ */
+ public void registerSortedIndex(UUID indexId, Function<BinaryRow, BinaryTuple> searchRowResolver) {
+ indexLockerFactories.put(
+ indexId,
+ partitionId -> new SortedIndexLocker(
+ indexId,
+ lockManager,
+ tbl.storage().getOrCreateSortedIndex(partitionId, indexId),
+ searchRowResolver
+ )
+ );
+ indexStorageAdapterFactories.put(
+ indexId,
+ partitionId -> new TableSchemaAwareIndexStorage(
+ indexId,
+ tbl.storage().getOrCreateSortedIndex(partitionId, indexId),
+ searchRowResolver
+ )
+ );
+
+ CompletableFuture<?> indexFuture = indexesToWait.remove(indexId);
+
+ if (indexFuture != null) {
+ indexFuture.complete(null);
+ }
+ }
+
+ /**
+ * Unregister given index from table.
+ *
+ * @param indexId An index id to unregister.
+ */
+ public void unregisterIndex(UUID indexId) {
+ indexLockerFactories.remove(indexId);
+ indexStorageAdapterFactories.remove(indexId);
+ }
+
+ private void awaitIndexes() {
+ // TODO: replace with actual call to ids supplier
+ List<UUID> indexIds = List.of(pkId()); // activeIndexIds.get();
+
+ List<CompletableFuture<?>> toWait = new ArrayList<>();
+
+ for (UUID indexId : indexIds) {
+ if (indexLockerFactories.containsKey(indexId) && indexStorageAdapterFactories.containsKey(indexId)) {
+ continue;
+ }
+
+ CompletableFuture<?> indexFuture = indexesToWait.computeIfAbsent(indexId, k -> new CompletableFuture<>());
+
+ // there is no synchronization between modification of index*Factories collections
+ // and indexesToWait collection, thus we may run into situation, when index was
+ // registered in the between of index existence check and registering a wait future.
+ // This second check aimed to resolve this race
+ if (indexLockerFactories.containsKey(indexId) && indexStorageAdapterFactories.containsKey(indexId)) {
+ indexesToWait.remove(indexId);
+
+ continue;
+ }
+
+ toWait.add(indexFuture);
+ }
+
+ allOf(toWait.toArray(CompletableFuture[]::new)).join();
+ }
+
+ @FunctionalInterface
+ private interface IndexLockerFactory {
+ /** Creates the index decorator for given partition. */
+ IndexLocker create(int partitionId);
+ }
+
+ @FunctionalInterface
+ private interface IndexStorageAdapterFactory {
+ /** Creates the index decorator for given partition. */
+ TableSchemaAwareIndexStorage create(int partitionId);
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/HashIndexLocker.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/HashIndexLocker.java
new file mode 100644
index 0000000000..55797014f3
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/HashIndexLocker.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.tx.LockKey;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.LockMode;
+
+/**
+ * Locker for a hash-based indexes.
+ *
+ * <p>Simply acquires lock on a given row.
+ */
+public class HashIndexLocker implements IndexLocker {
+ private final UUID indexId;
+ private final LockMode modificationMode;
+ private final LockManager lockManager;
+ private final Function<BinaryRow, BinaryTuple> indexRowResolver;
+
+ /**
+ * Constructs the object.
+ *
+ * @param indexId An identifier of the index this locker is created for.
+ * @param lockManager A lock manager to acquire locks in.
+ * @param indexRowResolver A convertor which derives an index key from given table row.
+ */
+ public HashIndexLocker(UUID indexId, boolean unique, LockManager lockManager,
+ Function<BinaryRow, BinaryTuple> indexRowResolver) {
+ this.indexId = indexId;
+ this.modificationMode = unique ? LockMode.X : LockMode.IX;
+ this.lockManager = lockManager;
+ this.indexRowResolver = indexRowResolver;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID id() {
+ return indexId;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> locksForLookup(UUID txId, BinaryRow tableRow) {
+ BinaryTuple key = indexRowResolver.apply(tableRow);
+
+ return lockManager.acquire(txId, new LockKey(indexId, key.byteBuffer()), LockMode.S);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> locksForInsert(UUID txId, BinaryRow tableRow, RowId rowId) {
+ BinaryTuple key = indexRowResolver.apply(tableRow);
+
+ return lockManager.acquire(txId, new LockKey(indexId, key.byteBuffer()), modificationMode);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> locksForRemove(UUID txId, BinaryRow tableRow, RowId rowId) {
+ BinaryTuple key = indexRowResolver.apply(tableRow);
+
+ return lockManager.acquire(txId, new LockKey(indexId, key.byteBuffer()), modificationMode);
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/IndexLocker.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/IndexLocker.java
new file mode 100644
index 0000000000..9890294006
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/IndexLocker.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.RowId;
+
+/**
+ * A decorator interface to hide all tx-protocol-related things.
+ *
+ * <p>Different indexes requires different approaches for locking. Thus every index type has its own implementation of this interface.
+ */
+public interface IndexLocker {
+ /** Returns an identifier of the index this locker created for. */
+ UUID id();
+
+ /**
+ * Acquires the lock for lookup operation.
+ *
+ * @param txId An identifier of the transaction in which the row is read.
+ * @param tableRow A table row to lookup.
+ * @return A future representing a result.
+ */
+ CompletableFuture<?> locksForLookup(UUID txId, BinaryRow tableRow);
+
+ /**
+ * Acquires the lock for insert operation.
+ *
+ * @param txId An identifier of the transaction in which the row is inserted.
+ * @param tableRow A table row to insert.
+ * @param rowId An identifier of the row in the main storage.
+ * @return A future representing a result.
+ */
+ CompletableFuture<?> locksForInsert(UUID txId, BinaryRow tableRow, RowId rowId);
+
+ /**
+ * Acquires the lock for remove operation.
+ *
+ * @param txId An identifier of the transaction in which the row is removed.
+ * @param tableRow A table row to remove.
+ * @param rowId An identifier of the row to remove.
+ * @return A future representing a result.
+ */
+ CompletableFuture<?> locksForRemove(UUID txId, BinaryRow tableRow, RowId rowId);
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
new file mode 100644
index 0000000000..7a6e87a66d
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/SortedIndexLocker.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.tx.LockKey;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.LockMode;
+
+/**
+ * Locker for a sorted indexes.
+ *
+ * <p>Simply acquires lock on a given row for lookup and remove, acquires lock on a next key for insert.
+ */
+public class SortedIndexLocker implements IndexLocker {
+ // private static final BinaryTuple POSITIVE_INF = new BinaryTuple(
+ // BinaryTupleSchema.create(new Element[0]),
+ // new BinaryTupleBuilder(0, false).build()
+ // );
+
+ private final UUID indexId;
+ private final LockManager lockManager;
+ // private final SortedIndexStorage storage;
+ private final Function<BinaryRow, BinaryTuple> indexRowResolver;
+
+ /**
+ * Constructs the object.
+ *
+ * @param indexId An identifier of the index this locker is created for.
+ * @param lockManager A lock manager to acquire locks in.
+ * @param storage A storage of an index this locker is created for.
+ * @param indexRowResolver A convertor which derives an index key from given table row.
+ */
+ public SortedIndexLocker(UUID indexId, LockManager lockManager, SortedIndexStorage storage,
+ Function<BinaryRow, BinaryTuple> indexRowResolver) {
+ this.indexId = indexId;
+ this.lockManager = lockManager;
+ // this.storage = storage;
+ this.indexRowResolver = indexRowResolver;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID id() {
+ return indexId;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> locksForLookup(UUID txId, BinaryRow tableRow) {
+ BinaryTuple key = indexRowResolver.apply(tableRow);
+
+ return lockManager.acquire(txId, new LockKey(indexId, key.byteBuffer()), LockMode.S);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> locksForInsert(UUID txId, BinaryRow tableRow, RowId rowId) {
+ BinaryTuple key = indexRowResolver.apply(tableRow);
+ // BinaryTuplePrefix prefix = BinaryTuplePrefix.fromBinaryTuple(key);
+
+ // find next key
+ // Cursor<IndexRow> cursor = storage.scan(prefix, null, SortedIndexStorage.GREATER);
+
+ // BinaryTuple nexKey;
+ // if (cursor.hasNext()) {
+ // nexKey = cursor.next().indexColumns();
+ // } else { // otherwise INF
+ // nexKey = POSITIVE_INF;
+ // }
+
+ // var nextLockKey = new LockKey(indexId, nexKey.byteBuffer());
+
+ // return lockManager.acquire(txId, nextLockKey, LockMode.IX)
+ // .thenCompose(shortLock ->
+ return lockManager.acquire(txId, new LockKey(indexId, key.byteBuffer()), LockMode.X);
+ // .thenRun(() -> {
+ // storage.put(new IndexRowImpl(key, rowId));
+
+ // lockManager.release(shortLock);
+ // })
+ // );
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<?> locksForRemove(UUID txId, BinaryRow tableRow, RowId rowId) {
+ BinaryTuple key = indexRowResolver.apply(tableRow);
+
+ return lockManager.acquire(txId, new LockKey(indexId, key.byteBuffer()), LockMode.IX);
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 6bd629ffaf..995dd33695 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -37,7 +37,6 @@ import static org.apache.ignite.internal.utils.RebalanceUtil.updatePendingAssign
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -109,7 +108,6 @@ import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.IgniteTablesInternal;
@@ -136,6 +134,7 @@ import org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbTableSt
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.IgniteNameUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.utils.RebalanceUtil;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -671,7 +670,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return failedFuture(e);
}
- InternalTable internalTbl = tablesById.get(tblId).internalTable();
+ TableImpl table = tablesById.get(tblId);
+ InternalTable internalTbl = table.internalTable();
MvTableStorage storage = internalTbl.storage();
boolean isInMemory = storage.isVolatile();
@@ -690,8 +690,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
CompletableFuture<Void> startGroupFut = CompletableFuture.completedFuture(null);
- ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
-
if (raftMgr.shouldHaveRaftGroupLocally(nodes)) {
startGroupFut = CompletableFuture
.supplyAsync(() -> internalTbl.storage().getOrCreateMvPartition(partId), ioExecutor)
@@ -744,7 +742,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
partitionStorage,
internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
txManager,
- primaryIndex
+ table.indexStorageAdapters(partId)
),
new RebalanceRaftGroupEventsListener(
metaStorageMgr,
@@ -792,7 +790,9 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
lockMgr,
partId,
tblId,
- primaryIndex,
+ table.indexesLockers(partId),
+ new Lazy<>(() -> table.indexStorageAdapters(partId)
+ .get().get(table.pkId())),
clock,
internalTbl.txStateStorage().getOrCreateTxStateStorage(partId),
topologyService,
@@ -972,7 +972,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
InternalTableImpl internalTable = new InternalTableImpl(name, tblId, new Int2ObjectOpenHashMap<>(partitions),
partitions, netAddrResolver, clusterNodeResolver, txManager, tableStorage, txStateStorage, replicaSvc, clock);
- var table = new TableImpl(internalTable);
+ var table = new TableImpl(internalTable, lockMgr, this::directIndexIds);
tablesByIdVv.update(causalityToken, (previous, e) -> inBusyLock(busyLock, () -> {
if (e != null) {
@@ -996,8 +996,11 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
tablesToStopInCaseOfError.put(tblId, table);
+ tablesByIdVv.get(causalityToken)
+ .thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
+
// TODO should be reworked in IGNITE-16763
- return tablesByIdVv.get(causalityToken).thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table)));
+ return CompletableFuture.completedFuture(null);
}
/**
@@ -1455,6 +1458,15 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
return ConfigurationUtil.internalIds(directProxy(tablesCfg.tables()));
}
+ /**
+ * Collects a list of direct index ids.
+ *
+ * @return A list of direct index ids.
+ */
+ private List<UUID> directIndexIds() {
+ return ConfigurationUtil.internalIds(directProxy(tablesCfg.indexes()));
+ }
+
/**
* Gets direct id of table with {@code tblName}.
*
@@ -1709,8 +1721,6 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
.filter(p -> !assignments.contains(p))
.collect(Collectors.toList());
- ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
-
try {
LOG.info("Received update on pending assignments. Check if new raft group should be started"
+ " [key={}, partition={}, table={}, localMemberAddress={}]",
@@ -1730,7 +1740,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
partitionStorage,
tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
txManager,
- primaryIndex
+ tbl.indexStorageAdapters(partId)
);
RaftGroupEventsListener raftGrpEvtsLsnr = new RebalanceRaftGroupEventsListener(
@@ -1764,7 +1774,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
lockMgr,
partId,
tblId,
- primaryIndex,
+ tbl.indexesLockers(partId),
+ new Lazy<>(() -> tbl.indexStorageAdapters(partId).get().get(tbl.pkId())),
clock,
tbl.internalTable().txStateStorage().getOrCreateTxStateStorage(partId),
raftMgr.topologyService(),
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
new file mode 100644
index 0000000000..f3e2d7757e
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableSchemaAwareIndexStorage.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed;
+
+import java.util.UUID;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.IndexRowImpl;
+import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.util.Cursor;
+
+/**
+ * An adapter that provides an index storage with a notion of the structure of a table row,
+ * i.e. derives the index key from a given table row.
+ */
+public class TableSchemaAwareIndexStorage {
+ private final UUID indexId;
+ private final IndexStorage storage;
+ private final Function<BinaryRow, BinaryTuple> indexRowResolver;
+
+ /** Constructs the object. */
+ public TableSchemaAwareIndexStorage(
+ UUID indexId,
+ IndexStorage storage,
+ Function<BinaryRow, BinaryTuple> indexRowResolver
+ ) {
+ this.indexId = indexId;
+ this.storage = storage;
+ this.indexRowResolver = indexRowResolver;
+ }
+
+ /** Returns an identifier of the index. */
+ public UUID id() {
+ return indexId;
+ }
+
+ /** Returns a cursor over {@code RowId}s associated with the given key. */
+ public Cursor<RowId> get(BinaryRow tableRow) throws StorageException {
+ BinaryTuple tuple = indexRowResolver.apply(tableRow);
+
+ return storage.get(tuple);
+ }
+
+ /**
+ * Inserts the given table row to an index storage.
+ *
+ * @param tableRow A table row to insert.
+ * @param rowId An identifier of a row in a main storage.
+ */
+ public void put(BinaryRow tableRow, RowId rowId) {
+ BinaryTuple tuple = indexRowResolver.apply(tableRow);
+
+ storage.put(new IndexRowImpl(tuple, rowId));
+ }
+
+ /**
+ * Removes the given table row from an index storage.
+ *
+ * @param tableRow A table row to remove.
+ * @param rowId An identifier of a row in a main storage.
+ */
+ public void remove(BinaryRow tableRow, RowId rowId) {
+ BinaryTuple tuple = indexRowResolver.apply(tableRow);
+
+ storage.remove(new IndexRowImpl(tuple, rowId));
+ }
+
+ /** Returns underlying index storage. */
+ public IndexStorage storage() {
+ return storage;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 28f600d67c..9b85b1ff39 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -19,31 +19,26 @@ package org.apache.ignite.internal.table.distributed.raft;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITED;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_UNEXPECTED_STATE_ERR;
import static org.apache.ignite.lang.IgniteStringFormatter.format;
-import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
+import java.util.function.Supplier;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.storage.DataRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.internal.storage.basic.BinarySearchRow;
-import org.apache.ignite.internal.storage.basic.DelegatingDataRow;
+import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
@@ -52,14 +47,13 @@ import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
-import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.raft.client.Command;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.apache.ignite.raft.client.service.RaftGroupListener;
-import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
@@ -78,18 +72,10 @@ public class PartitionListener implements RaftGroupListener {
/** Transaction manager. */
private final TxManager txManager;
- //TODO: https://issues.apache.org/jira/browse/IGNITE-17205 Temporary solution until the implementation of the primary index is done.
- /** Dummy primary index. */
- private final ConcurrentHashMap<ByteBuffer, RowId> primaryIndex;
-
- /** Keys that were inserted by a transaction. */
- private HashMap<UUID, Set<ByteBuffer>> txsInsertedKeys = new HashMap<>();
-
- /** Keys that were removed by a transaction. */
- private HashMap<UUID, Set<ByteBuffer>> txsRemovedKeys = new HashMap<>();
+ private final Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes;
/** Rows that were inserted, updated or removed. */
- private HashMap<UUID, Set<RowId>> txsPendingRowIds = new HashMap<>();
+ private final HashMap<UUID, Set<RowId>> txsPendingRowIds = new HashMap<>();
/**
* The constructor.
@@ -97,18 +83,17 @@ public class PartitionListener implements RaftGroupListener {
* @param store The storage.
* @param txStateStorage Transaction state storage.
* @param txManager Transaction manager.
- * @param primaryIndex Primary index map.
*/
public PartitionListener(
MvPartitionStorage store,
TxStateStorage txStateStorage,
TxManager txManager,
- ConcurrentHashMap<ByteBuffer, RowId> primaryIndex
+ Supplier<Map<UUID, TableSchemaAwareIndexStorage>> indexes
) {
this.storage = store;
this.txStateStorage = txStateStorage;
this.txManager = txManager;
- this.primaryIndex = primaryIndex;
+ this.indexes = indexes;
}
/** {@inheritDoc} */
@@ -171,28 +156,7 @@ public class PartitionListener implements RaftGroupListener {
txsPendingRowIds.computeIfAbsent(txId, entry -> new HashSet<>()).add(rowId);
- if (row == null) {
- // Remove entry.
- List<ByteBuffer> keys = primaryIndex.entrySet().stream()
- .filter(e -> e.getValue().equals(rowId))
- .map(Entry::getKey)
- .collect(Collectors.toList());
-
- assert keys.size() <= 1;
-
- if (keys.size() == 1) {
- txsRemovedKeys.computeIfAbsent(txId, entry -> new HashSet<>()).add(keys.get(0));
- txsInsertedKeys.computeIfAbsent(txId, entry -> new HashSet<>()).remove(keys.get(0));
- }
- } else if (!primaryIndex.containsKey(row.keySlice())) {
- // Insert entry.
- txsInsertedKeys.computeIfAbsent(txId, entry -> new HashSet<>()).add(row.keySlice());
- txsRemovedKeys.computeIfAbsent(txId, entry -> new HashSet<>()).remove(row.keySlice());
-
- primaryIndex.put(row.keySlice(), rowId);
- } else if (primaryIndex.containsKey(row.keySlice())) {
- txsRemovedKeys.computeIfAbsent(txId, entry -> new HashSet<>()).remove(row.keySlice());
- }
+ addToIndexes(row, rowId);
storage.lastAppliedIndex(commandIndex);
@@ -212,7 +176,7 @@ public class PartitionListener implements RaftGroupListener {
UUID commitTblId = cmd.getReplicationGroupId().getTableId();
int commitPartId = cmd.getReplicationGroupId().getPartId();
- if (!CollectionUtils.nullOrEmpty(rowsToUpdate)) {
+ if (!nullOrEmpty(rowsToUpdate)) {
for (Map.Entry<RowId, BinaryRow> entry : rowsToUpdate.entrySet()) {
RowId rowId = entry.getKey();
BinaryRow row = entry.getValue();
@@ -221,28 +185,7 @@ public class PartitionListener implements RaftGroupListener {
txsPendingRowIds.computeIfAbsent(txId, entry0 -> new HashSet<>()).add(rowId);
- if (row == null) {
- // Remove entry.
- List<ByteBuffer> keys = primaryIndex.entrySet().stream()
- .filter(e -> e.getValue().equals(rowId))
- .map(Entry::getKey)
- .collect(Collectors.toList());
-
- assert keys.size() <= 1;
-
- if (keys.size() == 1) {
- txsRemovedKeys.computeIfAbsent(txId, entry0 -> new HashSet<>()).add(keys.get(0));
- txsInsertedKeys.computeIfAbsent(txId, entry0 -> new HashSet<>()).remove(keys.get(0));
- }
- } else if (!primaryIndex.containsKey(row.keySlice())) {
- // Insert entry.
- txsInsertedKeys.computeIfAbsent(txId, entry0 -> new HashSet<>()).add(row.keySlice());
- txsRemovedKeys.computeIfAbsent(txId, entry0 -> new HashSet<>()).remove(row.keySlice());
-
- primaryIndex.put(row.keySlice(), rowId);
- } else if (primaryIndex.containsKey(row.keySlice())) {
- txsRemovedKeys.computeIfAbsent(txId, entry0 -> new HashSet<>()).remove(row.keySlice());
- }
+ addToIndexes(row, rowId);
}
}
storage.lastAppliedIndex(commandIndex);
@@ -309,30 +252,14 @@ public class PartitionListener implements RaftGroupListener {
storage.runConsistently(() -> {
UUID txId = cmd.txId();
- Set<ByteBuffer> removedKeys = txsRemovedKeys.getOrDefault(txId, Collections.emptySet());
-
- Set<ByteBuffer> insertedKeys = txsInsertedKeys.getOrDefault(txId, Collections.emptySet());
-
Set<RowId> pendingRowIds = txsPendingRowIds.getOrDefault(txId, Collections.emptySet());
if (cmd.commit()) {
pendingRowIds.forEach(rowId -> storage.commitWrite(rowId, cmd.commitTimestamp()));
} else {
- pendingRowIds.forEach(rowId -> storage.abortWrite(rowId));
+ pendingRowIds.forEach(storage::abortWrite);
}
- if (cmd.commit()) {
- for (ByteBuffer key : removedKeys) {
- primaryIndex.remove(key);
- }
- } else {
- for (ByteBuffer key : insertedKeys) {
- primaryIndex.remove(key);
- }
- }
-
- txsRemovedKeys.remove(txId);
- txsInsertedKeys.remove(txId);
txsPendingRowIds.remove(txId);
// TODO: IGNITE-17638 TestOnly code, let's consider using Txn state map instead of states.
@@ -366,15 +293,14 @@ public class PartitionListener implements RaftGroupListener {
}
}
- /**
- * Extracts a key and a value from the {@link BinaryRow} and wraps it in a {@link DataRow}.
- *
- * @param row Binary row.
- * @return Data row.
- */
- @NotNull
- private static DataRow extractAndWrapKeyValue(@NotNull BinaryRow row) {
- return new DelegatingDataRow(new BinarySearchRow(row), row.bytes());
+ private void addToIndexes(@Nullable BinaryRow tableRow, RowId rowId) {
+ if (tableRow == null || !tableRow.hasValue()) { // skip removes
+ return;
+ }
+
+ for (TableSchemaAwareIndexStorage index : indexes.get().values()) {
+ index.put(tableRow, rowId);
+ }
}
/**
@@ -384,12 +310,4 @@ public class PartitionListener implements RaftGroupListener {
public MvPartitionStorage getStorage() {
return storage;
}
-
- /**
- * Returns a primary index map.
- */
- @TestOnly
- public Map<ByteBuffer, RowId> getPk() {
- return primaryIndex;
- }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 89f034f077..018a302666 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -19,10 +19,9 @@ package org.apache.ignite.internal.table.distributed.replicator;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
-import static org.apache.ignite.lang.ErrorGroups.Replicator.CURSOR_CLOSE_ERR;
+import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import static org.apache.ignite.lang.IgniteStringFormatter.format;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -35,6 +34,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -51,6 +51,8 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.command.FinishTxCommand;
import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
@@ -65,7 +67,6 @@ import org.apache.ignite.internal.table.distributed.replication.request.ReadWrit
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSwapRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
-import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockKey;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.LockMode;
@@ -77,10 +78,11 @@ import org.apache.ignite.internal.tx.message.TxFinishReplicaRequest;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.TxStateReplicaRequest;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.network.TopologyService;
@@ -100,11 +102,8 @@ public class PartitionReplicaListener implements ReplicaListener {
/** Partition id. */
private final int partId;
- /** Primary key id. */
- public final UUID indexPkId;
-
- /** Scan index id. */
- public final UUID indexScanId;
+ /** Primary key index. */
+ public final Lazy<TableSchemaAwareIndexStorage> pkIndexStorage;
/** Table id. */
private final UUID tableId;
@@ -121,10 +120,6 @@ public class PartitionReplicaListener implements ReplicaListener {
/** Lock manager. */
private final LockManager lockManager;
- //TODO: https://issues.apache.org/jira/browse/IGNITE-17205 Temporary solution until the implementation of the primary index is done.
- /** Dummy primary index. */
- private final ConcurrentHashMap<ByteBuffer, RowId> primaryIndex;
-
/**
* Cursors map. The key of the map is internal Ignite uuid which consists of a transaction id ({@link UUID}) and a cursor id ({@link
* Long}).
@@ -147,7 +142,9 @@ public class PartitionReplicaListener implements ReplicaListener {
* Map to control clock's update in the read only transactions concurrently with a commit timestamp.
* TODO: IGNITE-17261 review this after the commit timestamp will be provided from a commit request (request.commitTimestamp()).
*/
- ConcurrentHashMap<UUID, CompletableFuture<TxMeta>> txTimestampUpdateMap = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<UUID, CompletableFuture<TxMeta>> txTimestampUpdateMap = new ConcurrentHashMap<>();
+
+ private final Supplier<Map<UUID, IndexLocker>> indexesLockers;
/**
* The constructor.
@@ -158,7 +155,6 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param lockManager Lock manager.
* @param partId Partition id.
* @param tableId Table id.
- * @param primaryIndex Primary index.
* @param hybridClock Hybrid clock.
* @param txStateStorage Transaction state storage.
* @param topologyService Topology services.
@@ -171,7 +167,8 @@ public class PartitionReplicaListener implements ReplicaListener {
LockManager lockManager,
int partId,
UUID tableId,
- ConcurrentHashMap<ByteBuffer, RowId> primaryIndex,
+ Supplier<Map<UUID, IndexLocker>> indexesLockers,
+ Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
HybridClock hybridClock,
TxStateStorage txStateStorage,
TopologyService topologyService,
@@ -183,15 +180,13 @@ public class PartitionReplicaListener implements ReplicaListener {
this.lockManager = lockManager;
this.partId = partId;
this.tableId = tableId;
- this.primaryIndex = primaryIndex;
+ this.indexesLockers = indexesLockers;
+ this.pkIndexStorage = pkIndexStorage;
this.hybridClock = hybridClock;
this.txStateStorage = txStateStorage;
this.topologyService = topologyService;
this.placementDriver = placementDriver;
- //TODO: IGNITE-17479 Integrate indexes into replicaListener command handlers
- this.indexScanId = new UUID(tableId.getMostSignificantBits(), tableId.getLeastSignificantBits() + 1);
- this.indexPkId = new UUID(tableId.getMostSignificantBits(), tableId.getLeastSignificantBits() + 2);
this.replicationGroupId = new TablePartitionId(tableId, partId);
cursors = new ConcurrentSkipListMap<>((o1, o2) -> {
@@ -336,62 +331,17 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Result future.
*/
private CompletableFuture<Object> processReadOnlySingleEntryAction(ReadOnlySingleRowReplicaRequest request) {
- ByteBuffer searchKey = request.binaryRow().keySlice();
+ BinaryRow tableRow = request.binaryRow();
+ HybridTimestamp readTimestamp = request.readTimestamp();
if (request.requestType() != RequestType.RO_GET) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
+ format("Unknown single request [actionType={}]", request.requestType()));
}
//TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
- HybridTimestamp readTimestamp = request.readTimestamp();
-
- try (PartitionTimestampCursor scan = mvDataStorage.scan(readTimestamp)) {
- while (scan.hasNext()) {
- ReadResult readResult = scan.next();
- HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
-
- if (readResult.binaryRow() == null) {
- if (newestCommitTimestamp == null) {
- throw new AssertionError("Unexpected null value of the newest committed timestamp.");
- }
-
- BinaryRow candidate = scan.committed(newestCommitTimestamp);
- if (candidate == null) {
- throw new AssertionError("Unexpected null value of the candidate binary row.");
- }
- if (candidate.keySlice().equals(searchKey)) {
- return CompletableFuture.completedFuture(
- resolveReadResult(
- readResult,
- readTimestamp,
- () -> scan.committed(newestCommitTimestamp)
- )
- );
- }
- } else if (readResult.binaryRow().keySlice().equals(searchKey)) {
- return CompletableFuture.completedFuture(
- resolveReadResult(
- readResult,
- readTimestamp,
- () -> newestCommitTimestamp == null ? null : scan.committed(newestCommitTimestamp)
- )
- );
- }
- }
- } catch (Exception e) {
- return failedFuture(
- withCause(
- ReplicationException::new,
- CURSOR_CLOSE_ERR,
- "Failed to close cursor.",
- e
- )
- );
- }
-
- return CompletableFuture.completedFuture(null);
+ return resolveRowByPk(tableRow, readTimestamp, (rowId, binaryRow) -> CompletableFuture.completedFuture(binaryRow));
}
/**
@@ -401,64 +351,17 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Result future.
*/
private CompletableFuture<Object> processReadOnlyMultiEntryAction(ReadOnlyMultiRowReplicaRequest request) {
- Collection<ByteBuffer> keyRows = request.binaryRows().stream().map(br -> br.keySlice()).collect(
- Collectors.toList());
-
- if (request.requestType() != RequestType.RO_GET_ALL) {
+ if (request.requestType() != RequestType.RO_GET_ALL) {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
+ format("Unknown single request [actionType={}]", request.requestType()));
}
- ArrayList<BinaryRow> result = new ArrayList<>(keyRows.size());
+ ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
- //TODO: IGNITE-17868 Integrate indexes into rowIds resolution along with proper lock management on search rows.
- HybridTimestamp readTimestamp = request.readTimestamp();
+ for (BinaryRow searchRow : request.binaryRows()) {
+ BinaryRow row = resolveRowByPk(searchRow, request.readTimestamp(), (rowId, binaryRow) -> binaryRow);
- try (PartitionTimestampCursor scan = mvDataStorage.scan(readTimestamp)) {
- while (scan.hasNext()) {
- ReadResult readResult = scan.next();
- HybridTimestamp newestCommitTimestamp = readResult.newestCommitTimestamp();
-
- for (ByteBuffer searchKey : keyRows) {
- if (readResult.binaryRow() == null) {
- if (newestCommitTimestamp == null) {
- throw new AssertionError("Unexpected null value of the newest committed timestamp.");
- }
-
- BinaryRow candidate = scan.committed(readResult.newestCommitTimestamp());
- if (candidate == null) {
- throw new AssertionError("Unexpected null value of the candidate binary row.");
- }
-
- if (candidate.keySlice().equals(searchKey)) {
- result.add(
- resolveReadResult(
- readResult,
- readTimestamp,
- () -> scan.committed(readResult.newestCommitTimestamp())
- )
- );
- }
- } else if (readResult.binaryRow().keySlice().equals(searchKey)) {
- result.add(
- resolveReadResult(
- readResult,
- readTimestamp,
- () -> newestCommitTimestamp == null ? null : scan.committed(readResult.newestCommitTimestamp())
- )
- );
- }
- }
- }
- } catch (Exception e) {
- return failedFuture(
- withCause(
- ReplicationException::new,
- CURSOR_CLOSE_ERR,
- "Failed to close cursor.",
- e
- )
- );
+ result.add(row);
}
return CompletableFuture.completedFuture(result);
@@ -484,7 +387,7 @@ public class PartitionReplicaListener implements ReplicaListener {
} catch (Exception e) {
if (ex == null) {
ex = new ReplicationException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("Close cursor exception [replicaGrpId={}, msg={}]", replicationGroupId,
+ format("Close cursor exception [replicaGrpId={}, msg={}]", replicationGroupId,
e.getMessage()), e);
} else {
ex.addSuppressed(e);
@@ -517,7 +420,7 @@ public class PartitionReplicaListener implements ReplicaListener {
cursor.close();
} catch (Exception e) {
throw new ReplicationException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("Close cursor exception [replicaGrpId={}, msg={}]", replicationGroupId,
+ format("Close cursor exception [replicaGrpId={}, msg={}]", replicationGroupId,
e.getMessage()), e);
}
}
@@ -544,7 +447,7 @@ public class PartitionReplicaListener implements ReplicaListener {
while (batchRows.size() < batchCount && cursor.hasNext()) {
BinaryRow resolvedReadResult = resolveReadResult(cursor.next(), txId);
- if (resolvedReadResult != null) {
+ if (resolvedReadResult != null && resolvedReadResult.hasValue()) {
batchRows.add(resolvedReadResult);
}
}
@@ -654,42 +557,84 @@ public class PartitionReplicaListener implements ReplicaListener {
}
/**
- * Returns index id of default {@lonk INDEX_SCAN_ID} index that will be used for operation.
+ * Finds the row and its identifier by given pk search row.
*
- * @param indexId Index id or {@code null}.
- * @return Index id.
+ * @param tableRow A bytes representing a primary tableRow.
+ * @param ts A timestamp regarding which we need to resolve the given row.
+ * @param action An action to perform on a resolved row.
+ * @param <T> A type of the value returned by action.
+ * @return Result of the given action.
*/
- private @NotNull UUID indexIdOrDefault(@Nullable UUID indexId) {
- return indexId != null ? indexId : indexScanId;
+ private <T> T resolveRowByPk(
+ BinaryRow tableRow,
+ HybridTimestamp ts,
+ BiFunction<@Nullable RowId, @Nullable BinaryRow, T> action
+ ) {
+ try (Cursor<RowId> cursor = pkIndexStorage.get().get(tableRow)) {
+ for (RowId rowId : cursor) {
+ ReadResult readResult = mvDataStorage.read(rowId, ts);
+
+ BinaryRow row = resolveReadResult(readResult, ts, () -> {
+ if (readResult.newestCommitTimestamp() == null) {
+ return null;
+ }
+
+ ReadResult committedReadResult = mvDataStorage.read(rowId, readResult.newestCommitTimestamp());
+
+ assert !committedReadResult.isWriteIntent() :
+ "The result is not committed [rowId=" + rowId + ", timestamp="
+ + readResult.newestCommitTimestamp() + ']';
+
+ return committedReadResult.binaryRow();
+ });
+
+ if (row != null && row.hasValue()) {
+ return action.apply(rowId, row);
+ }
+ }
+
+ return action.apply(null, null);
+ } catch (Exception e) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ format("Unable to close cursor [tableId={}]", tableId), e);
+ }
}
/**
- * Find out a row id by an index.
- * TODO: IGNITE-17479 Integrate indexes into replicaListener command handlers
+ * Finds the row and its identifier by given pk search row.
*
- * @param indexId Index id.
- * @param key Key to find.
- * @return Value or {@code null} if the key does not determine a value.
+ * @param tableRow A bytes representing a primary key.
+ * @param txId An identifier of the transaction regarding which we need to resolve the given row.
+ * @param action An action to perform on a resolved row.
+ * @param <T> A type of the value returned by action.
+ * @return A future object representing the result of the given action.
*/
- private RowId rowIdByKey(@NotNull UUID indexId, ByteBuffer key) {
- if (indexPkId.equals(indexId)) {
- return primaryIndex.get(key);
- }
+ private <T> CompletableFuture<T> resolveRowByPk(
+ BinaryRow tableRow,
+ UUID txId,
+ BiFunction<@Nullable RowId, @Nullable BinaryRow, CompletableFuture<T>> action
+ ) {
+ IndexLocker pkLocker = indexesLockers.get().get(pkIndexStorage.get().id());
- if (indexScanId.equals(indexId)) {
- RowId[] rowIdHolder = new RowId[1];
+ assert pkLocker != null;
- mvDataStorage.forEach((rowId, binaryRow) -> {
- if (rowIdHolder[0] == null && binaryRow.keySlice().equals(key)) {
- rowIdHolder[0] = rowId;
- }
- });
+ return pkLocker.locksForLookup(txId, tableRow)
+ .thenCompose(ignored -> {
+ try (Cursor<RowId> cursor = pkIndexStorage.get().get(tableRow)) {
+ for (RowId rowId : cursor) {
+ BinaryRow row = resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId);
- return rowIdHolder[0];
- }
+ if (row != null && row.hasValue()) {
+ return action.apply(rowId, row);
+ }
+ }
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("The index does not exist [indexId={}]", indexId));
+ return action.apply(null, null);
+ } catch (Exception e) {
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ format("Unable to close cursor [tableId={}]", tableId), e);
+ }
+ });
}
/**
@@ -714,8 +659,6 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Listener response.
*/
private CompletableFuture<Object> processMultiEntryAction(ReadWriteMultiRowReplicaRequest request) {
- UUID indexId = indexIdOrDefault(indexPkId/*request.indexToUse()*/);
-
UUID txId = request.transactionId();
TablePartitionId committedPartitionId = request.commitPartitionId();
@@ -724,57 +667,69 @@ public class PartitionReplicaListener implements ReplicaListener {
switch (request.requestType()) {
case RW_GET_ALL: {
- CompletableFuture<RowId>[] getLockFuts = new CompletableFuture[request.binaryRows().size()];
+ CompletableFuture<BinaryRow>[] rowFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
- for (BinaryRow row : request.binaryRows()) {
- getLockFuts[i++] = takeLocksForGet(row.keySlice(), indexId, txId);
- }
+ for (BinaryRow searchRow : request.binaryRows()) {
+ rowFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
- return allOf(getLockFuts).thenApply(ignore -> {
- ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
+ return takeLocksForGet(rowId, txId)
+ .thenApply(ignored -> row);
+ });
+ }
- for (int futNum = 0; futNum < request.binaryRows().size(); futNum++) {
- RowId lockedRowId = getLockFuts[futNum].join();
+ return allOf(rowFuts)
+ .thenCompose(ignored -> {
+ ArrayList<BinaryRow> result = new ArrayList<>(request.binaryRows().size());
- result.add(lockedRowId != null
- ? resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE), txId) : null
- );
- }
+ for (int idx = 0; idx < request.binaryRows().size(); idx++) {
+ result.add(rowFuts[idx].join());
+ }
- return result;
- });
+ return CompletableFuture.completedFuture(result);
+ });
}
case RW_DELETE_ALL: {
- CompletableFuture<RowId>[] deleteLockFuts = new CompletableFuture[request.binaryRows().size()];
+ CompletableFuture<RowId>[] rowIdLockFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
- for (BinaryRow row : request.binaryRows()) {
- deleteLockFuts[i++] = takeLocksForDelete(row.keySlice(), indexId, txId);
+ for (BinaryRow searchRow : request.binaryRows()) {
+ rowIdLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return takeLocksForDelete(searchRow, rowId, txId);
+ });
}
- return allOf(deleteLockFuts).thenCompose(ignore -> {
- Collection<RowId> rowIdsToDelete = new ArrayList<>();
+ return allOf(rowIdLockFuts).thenCompose(ignore -> {
+ Map<RowId, BinaryRow> rowIdsToDelete = new HashMap<>();
Collection<BinaryRow> result = new ArrayList<>();
int futNum = 0;
for (BinaryRow row : request.binaryRows()) {
- RowId lockedRowId = deleteLockFuts[futNum++].join();
+ RowId lockedRowId = rowIdLockFuts[futNum++].join();
if (lockedRowId != null) {
- rowIdsToDelete.add(lockedRowId);
+ rowIdsToDelete.put(lockedRowId, row);
} else {
result.add(row);
}
}
- CompletableFuture raftFut = rowIdsToDelete.isEmpty() ? CompletableFuture.completedFuture(null)
- : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId));
+ if (rowIdsToDelete.isEmpty()) {
+ return CompletableFuture.completedFuture(result);
+ }
- return raftFut.thenApply(ignored -> result);
+ return applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowIdsToDelete, txId))
+ .thenApply(ignored -> result);
});
}
case RW_DELETE_EXACT_ALL: {
@@ -782,8 +737,14 @@ public class PartitionReplicaListener implements ReplicaListener {
int i = 0;
- for (BinaryRow row : request.binaryRows()) {
- deleteExactLockFuts[i++] = takeLocksForDeleteExact(row.keySlice(), row, indexId, txId);
+ for (BinaryRow searchRow : request.binaryRows()) {
+ deleteExactLockFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ return takeLocksForDeleteExact(searchRow, rowId, row, txId);
+ });
}
return allOf(deleteExactLockFuts).thenCompose(ignore -> {
@@ -809,22 +770,23 @@ public class PartitionReplicaListener implements ReplicaListener {
});
}
case RW_INSERT_ALL: {
- CompletableFuture<RowId>[] insertLockFuts = new CompletableFuture[request.binaryRows().size()];
+ CompletableFuture<RowId>[] pkReadLockFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
- for (BinaryRow row : request.binaryRows()) {
- insertLockFuts[i++] = takeLocksForInsert(row.keySlice(), indexId, txId);
+ for (BinaryRow searchRow : request.binaryRows()) {
+ pkReadLockFuts[i++] = resolveRowByPk(searchRow, txId,
+ (rowId, row) -> CompletableFuture.completedFuture(rowId));
}
- return allOf(insertLockFuts).thenCompose(ignore -> {
+ return allOf(pkReadLockFuts).thenCompose(ignore -> {
Collection<BinaryRow> result = new ArrayList<>();
Map<RowId, BinaryRow> rowsToInsert = new HashMap<>();
int futNum = 0;
for (BinaryRow row : request.binaryRows()) {
- RowId lockedRow = insertLockFuts[futNum++].join();
+ RowId lockedRow = pkReadLockFuts[futNum++].join();
if (lockedRow != null) {
result.add(row);
@@ -837,45 +799,63 @@ public class PartitionReplicaListener implements ReplicaListener {
}
}
- CompletableFuture raftFut = rowsToInsert.isEmpty() ? CompletableFuture.completedFuture(null)
- : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowsToInsert, txId));
+ if (rowsToInsert.isEmpty()) {
+ return CompletableFuture.completedFuture(result);
+ }
- return raftFut.thenApply(ignored -> result);
+ CompletableFuture<RowId>[] insertLockFuts = new CompletableFuture[rowsToInsert.size()];
+
+ int idx = 0;
+
+ for (Map.Entry<RowId, BinaryRow> entry : rowsToInsert.entrySet()) {
+ insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
+ }
+
+ return allOf(insertLockFuts)
+ .thenCompose(ignored -> applyCmdWithExceptionHandling(
+ new UpdateAllCommand(committedPartitionId, rowsToInsert, txId)))
+ .thenApply(ignored -> result);
});
}
case RW_UPSERT_ALL: {
- CompletableFuture<RowId>[] upsertLockFuts = new CompletableFuture[request.binaryRows().size()];
+ CompletableFuture<RowId>[] rowIdFuts = new CompletableFuture[request.binaryRows().size()];
int i = 0;
- for (BinaryRow row : request.binaryRows()) {
- upsertLockFuts[i++] = takeLocksForUpsert(row.keySlice(), indexId, txId);
+ for (BinaryRow searchRow : request.binaryRows()) {
+ rowIdFuts[i++] = resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ boolean insert = rowId == null;
+
+ RowId rowId0 = insert ? new RowId(partId) : rowId;
+
+ return insert
+ ? takeLocksForInsert(searchRow, rowId0, txId)
+ : takeLocksForUpdate(searchRow, rowId0, txId);
+ });
}
- return allOf(upsertLockFuts).thenCompose(ignore -> {
+ return allOf(rowIdFuts).thenCompose(ignore -> {
Map<RowId, BinaryRow> rowsToUpdate = new HashMap<>();
int futNum = 0;
for (BinaryRow row : request.binaryRows()) {
- RowId lockedRow = upsertLockFuts[futNum++].join();
+ RowId lockedRow = rowIdFuts[futNum++].join();
- if (lockedRow != null) {
- rowsToUpdate.put(lockedRow, row);
- } else {
- rowsToUpdate.put(new RowId(partId), row);
- }
+ rowsToUpdate.put(lockedRow, row);
}
- CompletableFuture raftFut = rowsToUpdate.isEmpty() ? CompletableFuture.completedFuture(null)
- : applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowsToUpdate, txId));
+ if (rowsToUpdate.isEmpty()) {
+ return CompletableFuture.completedFuture(null);
+ }
- return raftFut.thenApply(ignored -> null);
+ return applyCmdWithExceptionHandling(new UpdateAllCommand(committedPartitionId, rowsToUpdate, txId))
+ .thenApply(ignored -> null);
});
}
default: {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("Unknown multi request [actionType={}]", request.requestType()));
+ format("Unknown multi request [actionType={}]", request.requestType()));
}
}
}
@@ -916,343 +896,238 @@ public class PartitionReplicaListener implements ReplicaListener {
TablePartitionId commitPartitionId = request.commitPartitionId();
assert commitPartitionId != null || request.requestType() == RequestType.RW_GET :
- "Commit partition partition is null [type=" + request.requestType() + ']';
-
- ByteBuffer searchKey = searchRow.keySlice();
-
- UUID indexId = indexIdOrDefault(indexPkId/*request.indexToUse()*/);
+ "Commit partition is null [type=" + request.requestType() + ']';
switch (request.requestType()) {
case RW_GET: {
- CompletableFuture<RowId> lockFut = takeLocksForGet(searchKey, indexId, txId);
-
- return lockFut.thenApply(lockedRowId -> {
- BinaryRow result = lockedRowId != null
- ? resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE), txId) : null;
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
- return result;
+ return takeLocksForGet(rowId, txId)
+ .thenApply(ignored -> row);
});
}
case RW_DELETE: {
- CompletableFuture<RowId> lockFut = takeLocksForDelete(searchKey, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- boolean removed = lockedRowId != null;
-
- CompletableFuture raftFut =
- removed ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRowId, txId)) :
- CompletableFuture.completedFuture(null);
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- return raftFut.thenApply(ignored -> removed);
+ return takeLocksForDelete(searchRow, rowId, txId)
+ .thenCompose(ignored -> applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, rowId, txId)))
+ .thenApply(ignored -> true);
});
}
case RW_GET_AND_DELETE: {
- CompletableFuture<RowId> lockFut = takeLocksForDelete(searchKey, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- BinaryRow lockedRow = lockedRowId != null
- ? resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE), txId) : null;
-
- CompletableFuture raftFut =
- lockedRowId != null ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRowId, txId))
- : CompletableFuture.completedFuture(null);
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
- return raftFut.thenApply(ignored -> lockedRow);
+ return takeLocksForDelete(searchRow, rowId, txId)
+ .thenCompose(ignored -> applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, rowId, txId)))
+ .thenApply(ignored -> row);
});
}
case RW_DELETE_EXACT: {
- CompletableFuture<RowId> lockFut = takeLocksForDeleteExact(searchKey, searchRow, indexId, txId);
-
- return lockFut.thenCompose(lockedRow -> {
- boolean removed = lockedRow != null;
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- CompletableFuture raftFut =
- removed ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRow, txId)) :
- CompletableFuture.completedFuture(null);
+ return takeLocksForDeleteExact(searchRow, rowId, row, txId)
+ .thenCompose(validatedRowId -> {
+ if (validatedRowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- return raftFut.thenApply(ignored -> removed);
+ return applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, validatedRowId, txId))
+ .thenApply(ignored -> true);
+ });
});
}
case RW_INSERT: {
- CompletableFuture<RowId> lockFut = takeLocksForInsert(searchKey, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- boolean inserted = lockedRowId == null;
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId != null) {
+ return CompletableFuture.completedFuture(false);
+ }
- CompletableFuture raftFut =
- lockedRowId == null ? applyCmdWithExceptionHandling(
- new UpdateCommand(commitPartitionId, new RowId(partId), searchRow, txId)) :
- CompletableFuture.completedFuture(null);
+ RowId rowId0 = new RowId(partId);
- return raftFut.thenApply(ignored -> inserted);
+ return takeLocksForInsert(searchRow, rowId0, txId)
+ .thenCompose(ignored -> applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, rowId0, searchRow, txId)))
+ .thenApply(ignored -> true);
});
}
case RW_UPSERT: {
- CompletableFuture<RowId> lockFut = takeLocksForUpsert(searchKey, indexId, txId);
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ boolean insert = rowId == null;
- return lockFut.thenCompose(lockedRowId -> {
- CompletableFuture raftFut = lockedRowId != null ? applyCmdWithExceptionHandling(
- new UpdateCommand(commitPartitionId, lockedRowId, searchRow, txId)) :
- applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, new RowId(partId), searchRow, txId));
+ RowId rowId0 = insert ? new RowId(partId) : rowId;
- return raftFut.thenApply(ignored -> null);
+ CompletableFuture<?> lockFut = insert
+ ? takeLocksForInsert(searchRow, rowId0, txId)
+ : takeLocksForUpdate(searchRow, rowId0, txId);
+
+ return lockFut
+ .thenCompose(ignored -> applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, rowId0, searchRow, txId)))
+ .thenApply(ignored -> null);
});
}
case RW_GET_AND_UPSERT: {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.X)
- .thenCompose(idxLock -> { // Index X lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on table
- CompletableFuture<Lock> rowLockFut = (rowId != null)
- ? lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.X)
- // X lock on RowId
- : CompletableFuture.completedFuture(null);
-
- return rowLockFut.thenCompose(rowLock -> {
- BinaryRow result = rowId != null
- ? resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId) : null;
-
- CompletableFuture raftFut = rowId != null ? applyCmdWithExceptionHandling(
- new UpdateCommand(commitPartitionId, rowId, searchRow, txId))
- : applyCmdWithExceptionHandling(
- new UpdateCommand(commitPartitionId, new RowId(partId), searchRow, txId));
-
- return raftFut.thenApply(ignored -> result);
- });
- });
- });
- }
- case RW_GET_AND_REPLACE: {
- CompletableFuture<RowId> idxLockFut = lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.S)
- .thenCompose(sharedIdxLock -> { // Index S lock
- RowId rowId = rowIdByKey(indexId, searchKey);
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ boolean insert = rowId == null;
- if (rowId != null) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.X)
- .thenApply(exclusiveIdxLock -> rowId); // Index X lock
- }
+ RowId rowId0 = insert ? new RowId(partId) : rowId;
- return CompletableFuture.completedFuture(null);
- });
+ CompletableFuture<?> lockFut = insert
+ ? takeLocksForInsert(searchRow, rowId0, txId)
+ : takeLocksForUpdate(searchRow, rowId0, txId);
- return idxLockFut.thenCompose(lockedRowId -> {
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on table
- CompletableFuture<BinaryRow> rowLockFut;
-
- if (lockedRowId != null) {
- rowLockFut = lockManager.acquire(txId, new LockKey(tableId, lockedRowId), LockMode.X)
- .thenApply(rowLock -> // X lock on RowId
- resolveReadResult(mvDataStorage.read(lockedRowId, HybridTimestamp.MAX_VALUE), txId)
- );
- } else {
- rowLockFut = CompletableFuture.completedFuture(null);
- }
-
- return rowLockFut.thenCompose(lockedRow -> {
- CompletableFuture raftFut = lockedRow == null ? CompletableFuture.completedFuture(null) :
- applyCmdWithExceptionHandling(
- new UpdateCommand(commitPartitionId, lockedRowId, searchRow, txId));
+ return lockFut
+ .thenCompose(ignored -> applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, rowId0, searchRow, txId)))
+ .thenApply(ignored -> row);
+ });
+ }
+ case RW_GET_AND_REPLACE: {
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(null);
+ }
- return raftFut.thenApply(ignored -> lockedRow);
- });
- });
+ return takeLocksForUpdate(searchRow, rowId, txId)
+ .thenCompose(ignored -> applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, rowId, searchRow, txId)))
+ .thenApply(ignored0 -> row);
});
}
case RW_REPLACE_IF_EXIST: {
- CompletableFuture<RowId> lockFut = takeLocksForReplaceIfExist(searchKey, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- boolean replaced = lockedRowId != null;
-
- CompletableFuture raftFut =
- replaced ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRowId, searchRow, txId))
- : CompletableFuture.completedFuture(null);
+ return resolveRowByPk(searchRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- return raftFut.thenApply(ignored -> replaced);
+ return takeLocksForUpdate(searchRow, rowId, txId)
+ .thenCompose(ignored -> applyCmdWithExceptionHandling(
+ new UpdateCommand(commitPartitionId, rowId, searchRow, txId)))
+ .thenApply(ignored -> true);
});
}
default: {
throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("Unknown single request [actionType={}]", request.requestType()));
+ format("Unknown single request [actionType={}]", request.requestType()));
}
}
}
- /**
- * Takes all required locks on a key, before replacing.
- *
- * @param searchKey Key to search.
- * @param indexId Index id.
- * @param txId Transaction id.
- * @return Future completes with {@link RowId} or {@code null} if there is no entry.
- */
- private CompletableFuture<RowId> takeLocksForReplaceIfExist(ByteBuffer searchKey, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.S).thenCompose(shareIdxLock -> { // Index R lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- CompletableFuture<Lock> idxLockFut = rowId != null
- ? lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.X) // Index X lock
- : CompletableFuture.completedFuture(null);
-
- return idxLockFut.thenCompose(exclusiveIdxLock -> lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on table
- if (rowId != null) {
- RowId rowIdToLock = rowId;
-
- return lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.X)
- .thenApply(rowLock -> rowIdToLock); // X lock on RowId
- }
-
- return CompletableFuture.completedFuture(null);
- }));
- });
- }
-
/**
* Takes all required locks on a key, before upserting.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
* @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no value.
*/
- private CompletableFuture<RowId> takeLocksForUpsert(ByteBuffer searchKey, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.X).thenCompose(idxLock -> { // Index X lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on table
- if (rowId != null) {
- return lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.X)
- .thenApply(rowLock -> rowId); // X lock on RowId
- }
-
- return CompletableFuture.completedFuture(null);
- });
- });
+ private CompletableFuture<RowId> takeLocksForUpdate(BinaryRow tableRow, RowId rowId, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
+ .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.X))
+ .thenCompose(ignored -> takePutLockOnIndexes(tableRow, rowId, txId))
+ .thenApply(ignored -> rowId);
}
/**
* Takes all required locks on a key, before inserting the value.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
- * @param txId Transaction id.
+ * @param tableRow Table row.
+ * @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no value.
*/
- private CompletableFuture<RowId> takeLocksForInsert(ByteBuffer searchKey, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.S) // Index S lock
- .thenCompose(sharedIdxLock -> {
- RowId rowId = rowIdByKey(indexId, searchKey);
+ private CompletableFuture<RowId> takeLocksForInsert(BinaryRow tableRow, RowId rowId, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // IX lock on table
+ .thenCompose(ignored -> takePutLockOnIndexes(tableRow, rowId, txId))
+ .thenApply(tblLock -> rowId);
+ }
- if (rowId == null) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.X) // Index X lock
- .thenCompose(exclusiveIdxLock ->
- lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // IX lock on table
- .thenApply(tblLock -> null));
- }
+ private CompletableFuture<?> takePutLockOnIndexes(BinaryRow tableRow, RowId rowId, UUID txId) {
+ Collection<IndexLocker> indexes = indexesLockers.get().values();
- return CompletableFuture.completedFuture(rowId);
- });
+ if (nullOrEmpty(indexes)) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
+ int idx = 0;
+
+ for (IndexLocker locker : indexes) {
+ locks[idx++] = locker.locksForInsert(txId, tableRow, rowId);
+ }
+
+ return CompletableFuture.allOf(locks);
+ }
+
+ private CompletableFuture<?> takeRemoveLockOnIndexes(BinaryRow tableRow, RowId rowId, UUID txId) {
+ Collection<IndexLocker> indexes = indexesLockers.get().values();
+
+ if (nullOrEmpty(indexes)) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ CompletableFuture<?>[] locks = new CompletableFuture[indexes.size()];
+ int idx = 0;
+
+ for (IndexLocker locker : indexes) {
+ locks[idx++] = locker.locksForRemove(txId, tableRow, rowId);
+ }
+
+ return CompletableFuture.allOf(locks);
}
/**
* Takes all required locks on a key, before deleting the value.
*
- * @param searchKey Key to search.
- * @param searchRow Row to remove.
- * @param indexId Index id.
* @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no value for remove.
*/
- private CompletableFuture<RowId> takeLocksForDeleteExact(ByteBuffer searchKey, BinaryRow searchRow, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.X).thenCompose(idxLock -> { // Index X lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // IX lock on table
- .thenCompose(tblLock -> {
- CompletableFuture<RowId> rowLockFut;
-
- if (rowId != null) {
- rowLockFut = lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.S) // S lock on RowId
- .thenCompose(sharedRowLock -> {
- BinaryRow curVal = resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId);
-
- if (equalValues(curVal, searchRow)) {
- return lockManager.acquire(txId, new LockKey(tableId, rowId),
- LockMode.X) // X lock on RowId
- .thenApply(exclusiveRowLock -> rowId);
- }
-
- return CompletableFuture.completedFuture(null);
- });
- } else {
- rowLockFut = CompletableFuture.completedFuture(null);
- }
+ private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // IX lock on table
+ .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.S)) // S lock on RowId
+ .thenCompose(ignored -> {
+ if (equalValues(actualRow, expectedRow)) {
+ return lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.X) // X lock on RowId
+ .thenCompose(ignored0 -> takeRemoveLockOnIndexes(actualRow, rowId, txId))
+ .thenApply(exclusiveRowLock -> rowId);
+ }
- return rowLockFut;
- });
- });
+ return CompletableFuture.completedFuture(null);
+ });
}
/**
* Takes all required locks on a key, before deleting the value.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
* @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no value for the key.
*/
- private CompletableFuture<RowId> takeLocksForDelete(ByteBuffer searchKey, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.X).thenCompose(idxLock -> { // Index X lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // IX lock on table
- .thenCompose(tblLock -> {
- if (rowId != null) {
- return lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.S) // S lock on RowId
- .thenCompose(sharedRowLock -> {
- BinaryRow curVal = resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId);
-
- if (curVal != null) {
- return lockManager.acquire(txId, new LockKey(tableId, rowId),
- LockMode.X) // X lock on RowId
- .thenApply(exclusiveRowLock -> rowId);
- }
-
- return CompletableFuture.completedFuture(null);
- });
- }
-
- return CompletableFuture.completedFuture(null);
- });
- });
+ private CompletableFuture<RowId> takeLocksForDelete(BinaryRow tableRow, RowId rowId, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX) // IX lock on table
+ .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.X)) // X lock on RowId
+ .thenCompose(ignored -> takeRemoveLockOnIndexes(tableRow, rowId, txId))
+ .thenApply(ignored -> rowId);
}
/**
* Takes all required locks on a key, before getting the value.
*
- * @param searchKey Key to search.
- * @param indexId Index id.
* @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no value for the key.
*/
- private CompletableFuture<RowId> takeLocksForGet(ByteBuffer searchKey, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.S).thenCompose(idxLock -> { // Index S lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- return lockManager.acquire(txId, new LockKey(tableId), LockMode.IS).thenCompose(tblLock -> { // IS lock on table
- if (rowId != null) {
- return lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.S) // S lock on RowId
- .thenApply(rowLock -> rowId);
- }
-
- return CompletableFuture.completedFuture(null);
- });
- });
+ private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId), LockMode.IS) // IS lock on table
+ .thenCompose(tblLock -> lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.S)) // S lock on RowId
+ .thenApply(ignored -> rowId);
}
/**
@@ -1262,80 +1137,55 @@ public class PartitionReplicaListener implements ReplicaListener {
* @return Listener response.
*/
private CompletableFuture<Object> processTwoEntriesAction(ReadWriteSwapRowReplicaRequest request) {
- BinaryRow searchRow = request.binaryRow();
- BinaryRow oldRow = request.oldBinaryRow();
+ BinaryRow newRow = request.binaryRow();
+ BinaryRow expectedRow = request.oldBinaryRow();
TablePartitionId commitPartitionId = request.commitPartitionId();
assert commitPartitionId != null : "Commit partition partition is null [type=" + request.requestType() + ']';
- ByteBuffer searchKey = searchRow.keySlice();
-
- UUID indexId = indexIdOrDefault(indexPkId/*request.indexToUse()*/);
-
UUID txId = request.transactionId();
- switch (request.requestType()) {
- case RW_REPLACE: {
- CompletableFuture<RowId> lockFut = takeLocsForReplace(searchKey, oldRow, indexId, txId);
-
- return lockFut.thenCompose(lockedRowId -> {
- boolean replaced = lockedRowId != null;
+ if (request.requestType() == RequestType.RW_REPLACE) {
+ return resolveRowByPk(newRow, txId, (rowId, row) -> {
+ if (rowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- CompletableFuture raftFut =
- replaced ? applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, lockedRowId, searchRow, txId))
- : CompletableFuture.completedFuture(null);
+ return takeLocksForReplace(expectedRow, row, newRow, rowId, txId)
+ .thenCompose(validatedRowId -> {
+ if (validatedRowId == null) {
+ return CompletableFuture.completedFuture(false);
+ }
- return raftFut.thenApply(ignored -> replaced);
- });
- }
- default: {
- throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
- IgniteStringFormatter.format("Unknown two actions operation [actionType={}]", request.requestType()));
- }
+ return applyCmdWithExceptionHandling(new UpdateCommand(commitPartitionId, validatedRowId, newRow, txId))
+ .thenApply(ignored -> true);
+ });
+ });
}
+
+ throw new IgniteInternalException(Replicator.REPLICA_COMMON_ERR,
+ format("Unknown two actions operation [actionType={}]", request.requestType()));
}
/**
* Takes all required locks on a key, before updating the value.
*
- * @param searchKey Key to search.
- * @param oldRow Old row that is expected.
- * @param indexId Index id.
* @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no suitable row.
*/
- private CompletableFuture<RowId> takeLocsForReplace(ByteBuffer searchKey, BinaryRow oldRow, UUID indexId, UUID txId) {
- return lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.S).thenCompose(shareIdxLock -> { // Index R lock
- RowId rowId = rowIdByKey(indexId, searchKey);
-
- CompletableFuture<Lock> idxLockFut = rowId != null
- ? lockManager.acquire(txId, new LockKey(indexId, searchKey), LockMode.X) // Index X lock
- : CompletableFuture.completedFuture(null);
-
- return idxLockFut.thenCompose(exclusiveIdxLock -> lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
- .thenCompose(tblLock -> { // IX lock on table
- CompletableFuture<RowId> rowLockFut;
-
- if (rowId != null) {
- rowLockFut = lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.S) // S lock on RowId
- .thenCompose(sharedRowLock -> {
- BinaryRow curVal = resolveReadResult(mvDataStorage.read(rowId, HybridTimestamp.MAX_VALUE), txId);
-
- if (equalValues(curVal, oldRow)) {
- return lockManager.acquire(txId, new LockKey(tableId, rowId),
- LockMode.X) // X lock on RowId
- .thenApply(rowLock -> rowId);
- }
-
- return CompletableFuture.completedFuture(null);
- });
- } else {
- rowLockFut = CompletableFuture.completedFuture(null);
- }
+ private CompletableFuture<RowId> takeLocksForReplace(BinaryRow expectedRow, BinaryRow oldRow,
+ BinaryRow newRow, RowId rowId, UUID txId) {
+ return lockManager.acquire(txId, new LockKey(tableId), LockMode.IX)
+ .thenCompose(ignored -> lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.S))
+ .thenCompose(ignored -> {
+ if (oldRow != null && equalValues(oldRow, expectedRow)) {
+ return lockManager.acquire(txId, new LockKey(tableId, rowId), LockMode.X) // X lock on RowId
+ .thenCompose(ignored1 -> takePutLockOnIndexes(newRow, rowId, txId))
+ .thenApply(rowLock -> rowId);
+ }
- return rowLockFut;
- }));
- });
+ return CompletableFuture.completedFuture(null);
+ });
}
/**
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index c9d53a903b..37aa582747 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -390,24 +390,24 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<R> result = new CompletableFuture();
enlist(partId, tx).<R>thenCompose(
- primaryReplicaAndTerm -> {
- try {
- return replicaSvc.invoke(
- primaryReplicaAndTerm.get1(),
- requestFunction.apply((TablePartitionId) tx.commitPartition(), primaryReplicaAndTerm.get2())
- );
- } catch (PrimaryReplicaMissException e) {
- throw new TransactionException(e);
- } catch (Throwable e) {
- throw new TransactionException(
- IgniteStringFormatter.format(
- "Failed to enlist partition[tableName={}, partId={}] into a transaction",
- tableName,
- partId
- )
- );
- }
- })
+ primaryReplicaAndTerm -> {
+ try {
+ return replicaSvc.invoke(
+ primaryReplicaAndTerm.get1(),
+ requestFunction.apply((TablePartitionId) tx.commitPartition(), primaryReplicaAndTerm.get2())
+ );
+ } catch (PrimaryReplicaMissException e) {
+ throw new TransactionException(e);
+ } catch (Throwable e) {
+ throw new TransactionException(
+ IgniteStringFormatter.format(
+ "Failed to enlist partition[tableName={}, partId={}] into a transaction",
+ tableName,
+ partId
+ )
+ );
+ }
+ })
.handle((res0, e) -> {
if (e != null) {
if (e.getCause() instanceof PrimaryReplicaMissException && attempts > 0) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java b/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
index a4fa7f06bb..0777772de0 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/Example.java
@@ -28,6 +28,7 @@ import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.NullableValue;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
@@ -50,7 +51,8 @@ public class Example {
* Returns table implementation.
*/
private static List<Table> tableFactory() {
- return Collections.singletonList(new TableImpl(new DummyInternalTableImpl(Mockito.mock(ReplicaService.class)), null));
+ return Collections.singletonList(
+ new TableImpl(new DummyInternalTableImpl(Mockito.mock(ReplicaService.class)), new HeapLockManager(), List::of));
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java
index 0109fd9d63..5d6a51991f 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/InteropOperationsTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.RecordMarshallerTest;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.KeyValueView;
@@ -116,7 +117,7 @@ public class InteropOperationsTest {
Mockito.when(clusterService.messagingService()).thenReturn(Mockito.mock(MessagingService.class, RETURNS_DEEP_STUBS));
- TABLE = new TableImpl(INT_TABLE, schemaRegistry);
+ TABLE = new TableImpl(INT_TABLE, schemaRegistry, new HeapLockManager());
KV_BIN_VIEW = new KeyValueBinaryViewImpl(INT_TABLE, schemaRegistry);
KV_VIEW = new KeyValueViewImpl<Long, Value>(INT_TABLE, schemaRegistry,
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
index 0a7142930a..433409d9c8 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueBinaryViewOperationsTest.java
@@ -33,6 +33,7 @@ import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
import org.apache.ignite.table.KeyValueView;
@@ -457,7 +458,7 @@ public class KeyValueBinaryViewOperationsTest {
Mockito.when(clusterService.messagingService()).thenReturn(Mockito.mock(MessagingService.class, RETURNS_DEEP_STUBS));
- return new TableImpl(table, new DummySchemaManagerImpl(schema));
+ return new TableImpl(table, new DummySchemaManagerImpl(schema), new HeapLockManager());
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java
index ba9db012c8..0c0d6ca7b9 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/KeyValueViewOperationsSimpleSchemaTest.java
@@ -42,6 +42,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaTestUtils;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.MarshallerException;
import org.apache.ignite.lang.UnexpectedNullValueException;
import org.apache.ignite.network.ClusterService;
@@ -712,6 +713,6 @@ public class KeyValueViewOperationsSimpleSchemaTest {
Mockito.when(clusterService.messagingService()).thenReturn(Mockito.mock(MessagingService.class, RETURNS_DEEP_STUBS));
- return new TableImpl(table, new DummySchemaManagerImpl(schema));
+ return new TableImpl(table, new DummySchemaManagerImpl(schema), new HeapLockManager());
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
index e4fba77f67..15065dc213 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/RecordBinaryViewOperationsTest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.schema.SchemaMismatchException;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
import org.apache.ignite.internal.table.impl.TestTupleBuilder;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
@@ -688,7 +689,7 @@ public class RecordBinaryViewOperationsTest {
Mockito.when(clusterService.messagingService()).thenReturn(Mockito.mock(MessagingService.class, RETURNS_DEEP_STUBS));
- return new TableImpl(table, new DummySchemaManagerImpl(schema));
+ return new TableImpl(table, new DummySchemaManagerImpl(schema), new HeapLockManager());
}
private <T extends Throwable> void assertThrowsWithCause(Class<T> expectedType, Executable executable) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/SchemaValidationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/SchemaValidationTest.java
index 8fa2d0f4da..5629e1bfa1 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/SchemaValidationTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/SchemaValidationTest.java
@@ -30,6 +30,7 @@ import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaMismatchException;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.table.impl.DummySchemaManagerImpl;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.MessagingService;
@@ -190,7 +191,7 @@ public class SchemaValidationTest {
}
private TableImpl createTableImpl(SchemaDescriptor schema) {
- return new TableImpl(createTable(), new DummySchemaManagerImpl(schema));
+ return new TableImpl(createTable(), new DummySchemaManagerImpl(schema), new HeapLockManager());
}
private <T extends Throwable> void assertThrowsWithCause(Class<T> expectedType, Executable executable) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
index cb0bdcab87..92ea9c80bf 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TxLocalTest.java
@@ -83,11 +83,11 @@ public class TxLocalTest extends TxAbstractTest {
DummyInternalTableImpl table = new DummyInternalTableImpl(replicaSvc, txManager, true);
- accounts = new TableImpl(table, new DummySchemaManagerImpl(ACCOUNTS_SCHEMA));
+ accounts = new TableImpl(table, new DummySchemaManagerImpl(ACCOUNTS_SCHEMA), lockManager);
DummyInternalTableImpl table2 = new DummyInternalTableImpl(replicaSvc, txManager, true);
- customers = new TableImpl(table2, new DummySchemaManagerImpl(CUSTOMERS_SCHEMA));
+ customers = new TableImpl(table2, new DummySchemaManagerImpl(CUSTOMERS_SCHEMA), lockManager);
when(clusterService.messagingService()).thenReturn(mock(MessagingService.class, RETURNS_DEEP_STUBS));
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 91e59b116b..da6c0159ab 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -607,6 +607,7 @@ public class TableManagerTest extends IgniteAbstractTest {
CountDownLatch createTblLatch = new CountDownLatch(1);
tableManager.listen(TableEvent.CREATE, (parameters, exception) -> {
+ parameters.table().pkId(UUID.randomUUID());
createTblLatch.countDown();
return completedFuture(true);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index 23555da62c..db1a922efe 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -26,15 +26,14 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
@@ -43,6 +42,9 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -52,6 +54,9 @@ import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
@@ -59,10 +64,11 @@ import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
-import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.network.ClusterService;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.BeforeEach;
@@ -93,13 +99,22 @@ public class PartitionCommandListenerTest {
private PartitionListener commandListener;
/** RAFT index. */
- private AtomicLong raftIndex = new AtomicLong();
+ private final AtomicLong raftIndex = new AtomicLong();
/** Primary index. */
- private ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
+ private final TableSchemaAwareIndexStorage pkStorage = new TableSchemaAwareIndexStorage(
+ UUID.randomUUID(),
+ new TestHashIndexStorage(null),
+ tableRow -> new BinaryTuple(
+ BinaryTupleSchema.create(new Element[]{
+ new Element(NativeTypes.BYTES, false)
+ }),
+ tableRow.keySlice()
+ )
+ );
/** Partition storage. */
- private MvPartitionStorage mvPartitionStorage = new TestMvPartitionStorage(PARTITION_ID);
+ private final MvPartitionStorage mvPartitionStorage = new TestMvPartitionStorage(PARTITION_ID);
/**
* Initializes a table listener before tests.
@@ -116,7 +131,7 @@ public class PartitionCommandListenerTest {
mvPartitionStorage,
new TestConcurrentHashMapTxStateStorage(),
new TxManagerImpl(replicaService, new HeapLockManager(), new HybridClock()),
- primaryIndex
+ () -> Map.of(pkStorage.id(), pkStorage)
);
}
@@ -255,33 +270,18 @@ public class PartitionCommandListenerTest {
* Inserts all rows.
*/
private void insertAll() {
- List<IgniteBiTuple<Row, UUID>> txs = new ArrayList<>();
-
- commandListener.onWrite(batchIterator(clo -> {
- when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
- doAnswer(invocation -> {
- assertNull(invocation.getArgument(0));
-
- return null;
- }).when(clo).result(any());
+ HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT);
+ UUID txId = Timestamp.nextVersion().toUuid();
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
- HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT);
- UUID txId = Timestamp.nextVersion().toUuid();
- var commitPartId = new TablePartitionId(txId, PARTITION_ID);
-
- for (int i = 0; i < KEY_COUNT; i++) {
- Row row = getTestRow(i, i);
-
- rows.put(new RowId(PARTITION_ID), row);
-
- txs.add(new IgniteBiTuple<>(row, txId));
- }
+ for (int i = 0; i < KEY_COUNT; i++) {
+ Row row = getTestRow(i, i);
- when(clo.command()).thenReturn(new UpdateAllCommand(commitPartId, rows, txId));
- }));
+ rows.put(new RowId(PARTITION_ID), row);
+ }
- txs.forEach(tuple -> mvPartitionStorage.commitWrite(primaryIndex.get(tuple.getKey().keySlice()), CLOCK.now()));
+ invokeBatchedCommand(new UpdateAllCommand(commitPartId, rows, txId));
+ invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()));
}
/**
@@ -290,69 +290,36 @@ public class PartitionCommandListenerTest {
* @param keyValueMapper Mep a value to update to the iter number.
*/
private void updateAll(Function<Integer, Integer> keyValueMapper) {
- List<IgniteBiTuple<Row, UUID>> txs = new ArrayList<>();
+ UUID txId = Timestamp.nextVersion().toUuid();
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
+ HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT);
- commandListener.onWrite(batchIterator(clo -> {
- when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
- doAnswer(invocation -> {
- assertNull(invocation.getArgument(0));
-
- return null;
- }).when(clo).result(any());
-
- HashMap<RowId, BinaryRow> rows = new HashMap<>(KEY_COUNT);
-
- UUID txId = Timestamp.nextVersion().toUuid();
- var commitPartId = new TablePartitionId(txId, PARTITION_ID);
-
- for (int i = 0; i < KEY_COUNT; i++) {
- Row row = getTestRow(i, keyValueMapper.apply(i));
-
- rows.put(primaryIndex.get(row.keySlice()), row);
-
- txs.add(new IgniteBiTuple<>(row, txId));
- }
+ for (int i = 0; i < KEY_COUNT; i++) {
+ Row row = getTestRow(i, keyValueMapper.apply(i));
- when(clo.command()).thenReturn(new UpdateAllCommand(commitPartId, rows, txId));
- }));
+ rows.put(readRow(row), row);
+ }
- txs.forEach(tuple -> mvPartitionStorage.commitWrite(primaryIndex.get(tuple.getKey().keySlice()), CLOCK.now()));
+ invokeBatchedCommand(new UpdateAllCommand(commitPartId, rows, txId));
+ invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()));
}
/**
* Deletes all rows.
*/
private void deleteAll() {
- List<IgniteBiTuple<Row, UUID>> txs = new ArrayList<>();
+ UUID txId = Timestamp.nextVersion().toUuid();
+ var commitPartId = new TablePartitionId(txId, PARTITION_ID);
+ Set<RowId> keyRows = new HashSet<>(KEY_COUNT);
- commandListener.onWrite(batchIterator(clo -> {
- when(clo.index()).thenReturn(raftIndex.incrementAndGet());
-
- doAnswer(invocation -> {
- assertNull(invocation.getArgument(0));
-
- return null;
- }).when(clo).result(any());
-
- Set<RowId> keyRows = new HashSet<>(KEY_COUNT);
-
- UUID txId = Timestamp.nextVersion().toUuid();
- var commitPartId = new TablePartitionId(txId, PARTITION_ID);
-
- for (int i = 0; i < KEY_COUNT; i++) {
- Row row = getTestRow(i, i);
-
- keyRows.add(primaryIndex.get(row.keySlice()));
-
- txs.add(new IgniteBiTuple<>(row, txId));
- }
+ for (int i = 0; i < KEY_COUNT; i++) {
+ Row row = getTestRow(i, i);
- when(clo.command()).thenReturn(new UpdateAllCommand(commitPartId, keyRows, txId));
- }));
+ keyRows.add(readRow(row));
+ }
- txs.forEach(
- tuple -> mvPartitionStorage.commitWrite(primaryIndex.remove(tuple.getKey().keySlice()), CLOCK.now()));
+ invokeBatchedCommand(new UpdateAllCommand(commitPartId, keyRows, txId));
+ invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now()));
}
/**
@@ -361,17 +328,17 @@ public class PartitionCommandListenerTest {
* @param keyValueMapper Mep a value to update to the iter number.
*/
private void update(Function<Integer, Integer> keyValueMapper) {
- List<IgniteBiTuple<Row, UUID>> txs = new ArrayList<>();
+ List<UUID> txIds = new ArrayList<>();
commandListener.onWrite(iterator((i, clo) -> {
UUID txId = Timestamp.nextVersion().toUuid();
Row row = getTestRow(i, keyValueMapper.apply(i));
- RowId rowId = primaryIndex.get(row.keySlice());
+ RowId rowId = readRow(row);
var commitPartId = new TablePartitionId(txId, PARTITION_ID);
assertNotNull(rowId);
- txs.add(new IgniteBiTuple<>(row, txId));
+ txIds.add(txId);
when(clo.index()).thenReturn(raftIndex.incrementAndGet());
@@ -384,24 +351,24 @@ public class PartitionCommandListenerTest {
}).when(clo).result(any());
}));
- txs.forEach(tuple -> mvPartitionStorage.commitWrite(primaryIndex.get(tuple.getKey().keySlice()), CLOCK.now()));
+ txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now())));
}
/**
* Deletes row.
*/
private void delete() {
- List<IgniteBiTuple<Row, UUID>> txs = new ArrayList<>();
+ List<UUID> txIds = new ArrayList<>();
commandListener.onWrite(iterator((i, clo) -> {
UUID txId = Timestamp.nextVersion().toUuid();
Row row = getTestRow(i, i);
- RowId rowId = primaryIndex.get(row.keySlice());
+ RowId rowId = readRow(row);
var commitPartId = new TablePartitionId(txId, PARTITION_ID);
assertNotNull(rowId);
- txs.add(new IgniteBiTuple<>(row, txId));
+ txIds.add(txId);
when(clo.index()).thenReturn(raftIndex.incrementAndGet());
@@ -414,8 +381,7 @@ public class PartitionCommandListenerTest {
}).when(clo).result(any());
}));
- txs.forEach(
- tuple -> mvPartitionStorage.commitWrite(primaryIndex.remove(tuple.getKey().keySlice()), CLOCK.now()));
+ txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now())));
}
/**
@@ -437,7 +403,7 @@ public class PartitionCommandListenerTest {
for (int i = 0; i < KEY_COUNT; i++) {
Row keyRow = getTestKey(i);
- RowId rowId = primaryIndex.get(keyRow.keySlice());
+ RowId rowId = readRow(keyRow);
if (existed) {
ReadResult readResult = mvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE);
@@ -456,13 +422,13 @@ public class PartitionCommandListenerTest {
* Inserts row.
*/
private void insert() {
- List<IgniteBiTuple<Row, UUID>> txs = new ArrayList<>();
+ List<UUID> txIds = new ArrayList<>();
commandListener.onWrite(iterator((i, clo) -> {
UUID txId = Timestamp.nextVersion().toUuid();
Row row = getTestRow(i, i);
var commitPartId = new TablePartitionId(txId, PARTITION_ID);
- txs.add(new IgniteBiTuple<>(row, txId));
+ txIds.add(txId);
when(clo.index()).thenReturn(raftIndex.incrementAndGet());
@@ -475,7 +441,7 @@ public class PartitionCommandListenerTest {
}).when(clo).result(any());
}));
- txs.forEach(tuple -> mvPartitionStorage.commitWrite(primaryIndex.get(tuple.getKey().keySlice()), CLOCK.now()));
+ txIds.forEach(txId -> invokeBatchedCommand(new TxCleanupCommand(txId, true, CLOCK.now())));
}
/**
@@ -506,4 +472,36 @@ public class PartitionCommandListenerTest {
return new Row(SCHEMA, rowBuilder.build());
}
+
+ private void invokeBatchedCommand(WriteCommand cmd) {
+ commandListener.onWrite(batchIterator(clo -> {
+ when(clo.index()).thenReturn(raftIndex.incrementAndGet());
+
+ doAnswer(invocation -> {
+ assertNull(invocation.getArgument(0));
+
+ return null;
+ }).when(clo).result(any());
+
+ when(clo.command()).thenReturn(cmd);
+ }));
+ }
+
+ private RowId readRow(BinaryRow tableRow) {
+ try (Cursor<RowId> cursor = pkStorage.get(tableRow)) {
+ while (cursor.hasNext()) {
+ RowId rowId = cursor.next();
+
+ ReadResult readResult = mvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE);
+
+ if (!readResult.isEmpty() && readResult.binaryRow() != null) {
+ return rowId;
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ return null;
+ }
}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
new file mode 100644
index 0000000000..b3f59b0f1c
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replication;
+
+import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.schema.BinaryConverter;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
+import org.apache.ignite.internal.schema.marshaller.MarshallerException;
+import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
+import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
+import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.SortedIndexLocker;
+import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
+import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
+import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
+import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
+import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
+import org.apache.ignite.internal.testframework.IgniteAbstractTest;
+import org.apache.ignite.internal.tx.Lock;
+import org.apache.ignite.internal.tx.LockManager;
+import org.apache.ignite.internal.tx.LockMode;
+import org.apache.ignite.internal.tx.Timestamp;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.HeapLockManager;
+import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.internal.util.Pair;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.TopologyService;
+import org.apache.ignite.raft.client.service.RaftGroupService;
+import org.hamcrest.CustomMatcher;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/** There are tests for partition replica listener. */
+public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest {
+ private static final int PART_ID = 0;
+ private static final UUID TABLE_ID = new UUID(0L, 0L);
+ private static final UUID PK_INDEX_ID = new UUID(0L, 1L);
+ private static final UUID HASH_INDEX_ID = new UUID(0L, 2L);
+ private static final UUID SORTED_INDEX_ID = new UUID(0L, 3L);
+ private static final UUID TRANSACTION_ID = Timestamp.nextVersion().toUuid();
+ private static final HybridClock CLOCK = new HybridClock();
+ private static final LockManager LOCK_MANAGER = new HeapLockManager();
+ private static final TablePartitionId PARTITION_ID = new TablePartitionId(TABLE_ID, PART_ID);
+ private static final TableMessagesFactory TABLE_MESSAGES_FACTORY = new TableMessagesFactory();
+ private static final TestMvPartitionStorage TEST_MV_PARTITION_STORAGE = new TestMvPartitionStorage(PART_ID);
+
+ private static SchemaDescriptor schemaDescriptor;
+ private static KvMarshaller<Integer, Integer> kvMarshaller;
+ private static Lazy<TableSchemaAwareIndexStorage> pkStorage;
+ private static PartitionReplicaListener partitionReplicaListener;
+ private static Function<BinaryRow, BinaryTuple> row2HashKeyConverter;
+ private static Function<BinaryRow, BinaryTuple> row2SortKeyConverter;
+
+ @BeforeAll
+ private static void beforeAll() {
+ RaftGroupService mockRaftClient = mock(RaftGroupService.class);
+
+ when(mockRaftClient.refreshAndGetLeaderWithTerm())
+ .thenAnswer(invocationOnMock -> CompletableFuture.completedFuture(new IgniteBiTuple<>(null, 1L)));
+ when(mockRaftClient.run(any()))
+ .thenAnswer(invocationOnMock -> CompletableFuture.completedFuture(null));
+
+ BinaryTupleSchema tupleSchema = BinaryTupleSchema.create(new Element[]{
+ new Element(NativeTypes.INT32, false)
+ });
+
+ schemaDescriptor = new SchemaDescriptor(1, new Column[]{
+ new Column("id".toUpperCase(Locale.ROOT), NativeTypes.INT32, false),
+ }, new Column[]{
+ new Column("val".toUpperCase(Locale.ROOT), NativeTypes.INT32, false),
+ });
+
+ row2HashKeyConverter = tableRow -> new BinaryTuple(
+ tupleSchema,
+ BinaryConverter.forKey(schemaDescriptor).toTuple(tableRow)
+ );
+
+ pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
+ PK_INDEX_ID,
+ new TestHashIndexStorage(null),
+ row2HashKeyConverter
+ ));
+
+ IndexLocker pkLocker = new HashIndexLocker(PK_INDEX_ID, true, LOCK_MANAGER, row2HashKeyConverter);
+ IndexLocker hashIndexLocker = new HashIndexLocker(HASH_INDEX_ID, false, LOCK_MANAGER, row2HashKeyConverter);
+
+ row2SortKeyConverter = tableRow -> new BinaryTuple(
+ tupleSchema,
+ BinaryConverter.forValue(schemaDescriptor).toTuple(tableRow)
+ );
+
+ IndexLocker sortedIndexLocker = new SortedIndexLocker(
+ SORTED_INDEX_ID,
+ LOCK_MANAGER,
+ new TestSortedIndexStorage(
+ new SortedIndexDescriptor(
+ SORTED_INDEX_ID,
+ List.of(new SortedIndexColumnDescriptor(
+ "val", NativeTypes.INT32, false, true
+ ))
+ )
+ ),
+ row2SortKeyConverter
+ );
+
+ partitionReplicaListener = new PartitionReplicaListener(
+ TEST_MV_PARTITION_STORAGE,
+ mockRaftClient,
+ mock(TxManager.class),
+ LOCK_MANAGER,
+ PART_ID,
+ TABLE_ID,
+ () -> Map.of(
+ pkLocker.id(), pkLocker,
+ hashIndexLocker.id(), hashIndexLocker,
+ sortedIndexLocker.id(), sortedIndexLocker
+ ),
+ pkStorage,
+ CLOCK,
+ new TestConcurrentHashMapTxStateStorage(),
+ mock(TopologyService.class),
+ mock(PlacementDriver.class)
+ );
+
+ kvMarshaller = new ReflectionMarshallerFactory().create(schemaDescriptor, Integer.class, Integer.class);
+ }
+
+ @BeforeEach
+ private void beforeTest() {
+ ((TestHashIndexStorage) pkStorage.get().storage()).destroy();
+ TEST_MV_PARTITION_STORAGE.clear();
+
+ locks().forEach(LOCK_MANAGER::release);
+ }
+
+ /** Verifies the mode in which the lock was acquired on the index key for a particular operation. */
+ @ParameterizedTest
+ @MethodSource("readWriteSingleTestArguments")
+ public void testReadWriteSingle(ReadWriteTestArg arg) {
+ BinaryRow testBinaryRow = binaryRow(1, 1);
+
+ if (arg.type != RequestType.RW_INSERT) {
+ var rowId = new RowId(PART_ID);
+ insertRows(List.of(new Pair<>(testBinaryRow, rowId)), Timestamp.nextVersion().toUuid());
+ }
+
+ CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteSingleRowReplicaRequest()
+ .groupId(PARTITION_ID)
+ .term(1L)
+ .commitPartitionId(PARTITION_ID)
+ .transactionId(TRANSACTION_ID)
+ .binaryRow(testBinaryRow)
+ .requestType(arg.type)
+ .build());
+
+ await(fut);
+
+ assertThat(
+ locks(),
+ allOf(
+ hasItem(lockThat(
+ arg.expectedLockOnUniqueHash + " on unique hash index",
+ lock -> PK_INDEX_ID.equals(lock.lockKey().contextId())
+ && lock.lockMode() == arg.expectedLockOnUniqueHash
+ )),
+ hasItem(lockThat(
+ arg.expectedLockOnNonUniqueHash + " on non unique hash index",
+ lock -> HASH_INDEX_ID.equals(lock.lockKey().contextId())
+ && lock.lockMode() == arg.expectedLockOnNonUniqueHash
+ )),
+ hasItem(lockThat(
+ arg.expectedLockOnSort + " on sorted index",
+ lock -> SORTED_INDEX_ID.equals(lock.lockKey().contextId())
+ && lock.lockMode() == arg.expectedLockOnSort
+ ))
+ )
+ );
+ }
+
+ /** Verifies the mode in which the lock was acquired on the index key for a particular operation. */
+ @ParameterizedTest
+ @MethodSource("readWriteMultiTestArguments")
+ public void testReadWriteMulti(ReadWriteTestArg arg) {
+ List<BinaryRow> rows = List.of(binaryRow(1, 1), binaryRow(2, 2), binaryRow(3, 3));
+
+ if (arg.type != RequestType.RW_INSERT_ALL) {
+ for (BinaryRow row : rows) {
+ var rowId = new RowId(PART_ID);
+ insertRows(List.of(new Pair<>(row, rowId)), Timestamp.nextVersion().toUuid());
+ }
+ }
+
+ CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
+ .groupId(PARTITION_ID)
+ .term(1L)
+ .commitPartitionId(PARTITION_ID)
+ .transactionId(TRANSACTION_ID)
+ .binaryRows(rows)
+ .requestType(arg.type)
+ .build());
+
+ await(fut);
+
+ for (BinaryRow row : rows) {
+ assertThat(
+ locks(),
+ allOf(
+ hasItem(lockThat(
+ arg.expectedLockOnUniqueHash + " on unique hash index",
+ lock -> PK_INDEX_ID.equals(lock.lockKey().contextId())
+ && row2HashKeyConverter.apply(row).byteBuffer().equals(lock.lockKey().key())
+ && lock.lockMode() == arg.expectedLockOnUniqueHash
+ )),
+ hasItem(lockThat(
+ arg.expectedLockOnNonUniqueHash + " on non unique hash index",
+ lock -> HASH_INDEX_ID.equals(lock.lockKey().contextId())
+ && row2HashKeyConverter.apply(row).byteBuffer().equals(lock.lockKey().key())
+ && lock.lockMode() == arg.expectedLockOnNonUniqueHash
+ )),
+ hasItem(lockThat(
+ arg.expectedLockOnSort + " on sorted index",
+ lock -> SORTED_INDEX_ID.equals(lock.lockKey().contextId())
+ && row2SortKeyConverter.apply(row).byteBuffer().equals(lock.lockKey().key())
+ && lock.lockMode() == arg.expectedLockOnSort
+ ))
+ )
+ );
+ }
+ }
+
+ private static Iterable<ReadWriteTestArg> readWriteSingleTestArguments() {
+ return List.of(
+ new ReadWriteTestArg(RequestType.RW_DELETE, LockMode.X, LockMode.IX, LockMode.IX),
+ new ReadWriteTestArg(RequestType.RW_DELETE_EXACT, LockMode.X, LockMode.IX, LockMode.IX),
+ new ReadWriteTestArg(RequestType.RW_INSERT, LockMode.X, LockMode.IX, LockMode.X),
+ new ReadWriteTestArg(RequestType.RW_UPSERT, LockMode.X, LockMode.IX, LockMode.X),
+ new ReadWriteTestArg(RequestType.RW_REPLACE_IF_EXIST, LockMode.X, LockMode.IX, LockMode.X),
+
+ new ReadWriteTestArg(RequestType.RW_GET_AND_DELETE, LockMode.X, LockMode.IX, LockMode.IX),
+ new ReadWriteTestArg(RequestType.RW_GET_AND_REPLACE, LockMode.X, LockMode.IX, LockMode.X),
+ new ReadWriteTestArg(RequestType.RW_GET_AND_UPSERT, LockMode.X, LockMode.IX, LockMode.X)
+ );
+ }
+
+ private static Iterable<ReadWriteTestArg> readWriteMultiTestArguments() {
+ return List.of(
+ new ReadWriteTestArg(RequestType.RW_DELETE_ALL, LockMode.X, LockMode.IX, LockMode.IX),
+ new ReadWriteTestArg(RequestType.RW_DELETE_EXACT_ALL, LockMode.X, LockMode.IX, LockMode.IX),
+ new ReadWriteTestArg(RequestType.RW_INSERT_ALL, LockMode.X, LockMode.IX, LockMode.X),
+ new ReadWriteTestArg(RequestType.RW_UPSERT_ALL, LockMode.X, LockMode.IX, LockMode.X)
+ );
+ }
+
+ private List<Lock> locks() {
+ List<Lock> locks = new ArrayList<>();
+
+ Iterator<Lock> it = LOCK_MANAGER.locks(TRANSACTION_ID);
+
+ while (it.hasNext()) {
+ locks.add(it.next());
+ }
+
+ return locks;
+ }
+
+ private void insertRows(List<Pair<BinaryRow, RowId>> rows, UUID txId) {
+ HybridTimestamp commitTs = CLOCK.now();
+
+ for (Pair<BinaryRow, RowId> row : rows) {
+ BinaryRow tableRow = row.getFirst();
+ RowId rowId = row.getSecond();
+
+ pkStorage.get().put(tableRow, rowId);
+ TEST_MV_PARTITION_STORAGE.addWrite(rowId, tableRow, txId, TABLE_ID, PART_ID);
+ TEST_MV_PARTITION_STORAGE.commitWrite(rowId, commitTs);
+ }
+ }
+
+ protected static BinaryRow binaryRow(Integer key, Integer value) {
+ try {
+ return kvMarshaller.marshal(key, value);
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ protected static Integer key(BinaryRow binaryRow) {
+ try {
+ return kvMarshaller.unmarshalKey(new Row(schemaDescriptor, binaryRow));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ protected static Integer value(BinaryRow binaryRow) {
+ try {
+ return kvMarshaller.unmarshalValue(new Row(schemaDescriptor, binaryRow));
+ } catch (MarshallerException e) {
+ throw new IgniteException(e);
+ }
+ }
+
+ private static Matcher<Lock> lockThat(String description, Function<Lock, Boolean> checker) {
+ return new CustomMatcher<>(description) {
+ @Override
+ public boolean matches(Object actual) {
+ return actual instanceof Lock && checker.apply((Lock) actual) == Boolean.TRUE;
+ }
+ };
+ }
+
+ static class ReadWriteTestArg {
+ private final RequestType type;
+ private final LockMode expectedLockOnUniqueHash;
+ private final LockMode expectedLockOnNonUniqueHash;
+ private final LockMode expectedLockOnSort;
+
+ public ReadWriteTestArg(
+ RequestType type,
+ LockMode expectedLockOnUniqueHash,
+ LockMode expectedLockOnNonUniqueHash,
+ LockMode expectedLockOnSort
+ ) {
+ this.type = type;
+ this.expectedLockOnUniqueHash = expectedLockOnUniqueHash;
+ this.expectedLockOnNonUniqueHash = expectedLockOnNonUniqueHash;
+ this.expectedLockOnSort = expectedLockOnSort;
+ }
+
+ @Override
+ public String toString() {
+ return type.toString();
+ }
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index eb1c640790..f8623eb19c 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -26,17 +26,20 @@ import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Locale;
+import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -47,7 +50,11 @@ import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshal
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.distributed.IndexLocker;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
@@ -55,6 +62,7 @@ import org.apache.ignite.internal.table.distributed.replicator.action.RequestTyp
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxMeta;
@@ -62,6 +70,7 @@ import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
@@ -91,9 +100,6 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
/** Replication group id. */
private static final ReplicationGroupId grpId = new TablePartitionId(tblId, partId);
- /** Primary index map. */
- private static final ConcurrentHashMap<ByteBuffer, RowId> primaryIndex = new ConcurrentHashMap<>();
-
/** Hybrid clock. */
private static final HybridClock clock = new HybridClock();
@@ -129,6 +135,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
/** Partition replication listener to test. */
private static PartitionReplicaListener partitionReplicaListener;
+ /** Primary index. */
+ private static Lazy<TableSchemaAwareIndexStorage> pkStorage;
+
+ private static IndexLocker pkLocker;
+
/** If true the local replica is considered leader, false otherwise. */
private static boolean localLeader;
@@ -175,14 +186,33 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
return CompletableFuture.completedFuture(txMeta);
});
+ UUID indexId = UUID.randomUUID();
+
+ BinaryTupleSchema pkSchema = BinaryTupleSchema.create(new Element[]{
+ new Element(NativeTypes.BYTES, false)
+ });
+
+ Function<BinaryRow, BinaryTuple> row2tuple = tableRow -> new BinaryTuple(pkSchema, ((BinaryRow) tableRow).keySlice());
+
+ pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
+ indexId,
+ new TestHashIndexStorage(null),
+ row2tuple
+ ));
+
+ LockManager lockManager = new HeapLockManager();
+
+ pkLocker = new HashIndexLocker(indexId, true, lockManager, row2tuple);
+
partitionReplicaListener = new PartitionReplicaListener(
testMvPartitionStorage,
mockRaftClient,
mock(TxManager.class),
- new HeapLockManager(),
+ lockManager,
partId,
tblId,
- primaryIndex,
+ () -> Map.of(pkLocker.id(), pkLocker),
+ pkStorage,
clock,
txStateStorage,
topologySrv,
@@ -206,7 +236,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private void beforeTest() {
localLeader = true;
txState = null;
- primaryIndex.clear();
+ ((TestHashIndexStorage) pkStorage.get().storage()).destroy();
}
@Test
@@ -283,7 +313,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(partId);
- primaryIndex.put(testBinaryKey.keySlice(), rowId);
+ pkStorage.get().put(testBinaryKey, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
testMvPartitionStorage.commitWrite(rowId, clock.now());
@@ -307,7 +337,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
var rowId = new RowId(partId);
txState = TxState.COMMITED;
- primaryIndex.put(testBinaryKey.keySlice(), rowId);
+ pkStorage.get().put(testBinaryKey, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
@@ -329,7 +359,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
BinaryRow testBinaryRow = binaryRow(key(testBinaryKey), new TestValue(1, "v1"));
var rowId = new RowId(partId);
- primaryIndex.put(testBinaryKey.keySlice(), rowId);
+ pkStorage.get().put(testBinaryKey, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
@@ -352,7 +382,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
var rowId = new RowId(partId);
txState = TxState.ABORTED;
- primaryIndex.put(testBinaryKey.keySlice(), rowId);
+ pkStorage.get().put(testBinaryKey, rowId);
testMvPartitionStorage.addWrite(rowId, testBinaryRow, txId, tblId, partId);
CompletableFuture fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index a0c1b15dbf..2ec7c99459 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -23,12 +23,12 @@ import static org.mockito.Mockito.mock;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Function;
import javax.naming.OperationNotSupportedException;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.replicator.ReplicaService;
@@ -36,10 +36,17 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowEx;
+import org.apache.ignite.internal.schema.BinaryTuple;
+import org.apache.ignite.internal.schema.BinaryTupleSchema;
+import org.apache.ignite.internal.schema.BinaryTupleSchema.Element;
+import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.storage.MvPartitionStorage;
-import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
+import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
+import org.apache.ignite.internal.table.distributed.HashIndexLocker;
+import org.apache.ignite.internal.table.distributed.IndexLocker;
+import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
@@ -50,6 +57,7 @@ import org.apache.ignite.internal.tx.impl.HeapLockManager;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.storage.state.test.TestConcurrentHashMapTxStateStorage;
+import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;
@@ -202,7 +210,22 @@ public class DummyInternalTableImpl extends InternalTableImpl {
}
).when(svc).run(any());
- var primaryIndex = new ConcurrentHashMap<ByteBuffer, RowId>();
+ UUID tableId = tableId();
+ UUID indexId = UUID.randomUUID();
+
+ BinaryTupleSchema pkSchema = BinaryTupleSchema.create(new Element[]{
+ new Element(NativeTypes.BYTES, false)
+ });
+
+ Function<BinaryRow, BinaryTuple> row2tuple = tableRow -> new BinaryTuple(pkSchema, ((BinaryRow) tableRow).keySlice());
+
+ Lazy<TableSchemaAwareIndexStorage> pkStorage = new Lazy<>(() -> new TableSchemaAwareIndexStorage(
+ indexId,
+ new TestHashIndexStorage(null),
+ row2tuple
+ ));
+
+ IndexLocker pkLocker = new HashIndexLocker(indexId, true, this.txManager.lockManager(), row2tuple);
replicaListener = new PartitionReplicaListener(
mvPartStorage,
@@ -210,20 +233,20 @@ public class DummyInternalTableImpl extends InternalTableImpl {
this.txManager,
this.txManager.lockManager(),
0,
- tableId(),
- primaryIndex,
+ tableId,
+ () -> Map.of(pkLocker.id(), pkLocker),
+ pkStorage,
new HybridClock(),
null,
null,
null
-
);
partitionListener = new PartitionListener(
mvPartStorage,
new TestConcurrentHashMapTxStateStorage(),
this.txManager,
- primaryIndex
+ () -> Map.of(pkStorage.get().id(), pkStorage.get())
);
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Lock.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Lock.java
index 29577e84a5..0083631025 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Lock.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/Lock.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx;
import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
/** Lock. */
public class Lock {
@@ -70,4 +71,9 @@ public class Lock {
public UUID txId() {
return txId;
}
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
}
diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
index bf8a05a95c..30095c12cc 100644
--- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
+++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockKey.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx;
import java.util.UUID;
+import org.apache.ignite.internal.tostring.S;
/** Lock key. */
public class LockKey {
@@ -77,4 +78,9 @@ public class LockKey {
result = 31 * result + (key != null ? key.hashCode() : 0);
return result;
}
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
}