You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2022/02/07 16:45:48 UTC
[ignite-3] branch ignite-14925-sorted-indexes updated: IGNITE-14830 Introduce sorted index prototype (#518)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch ignite-14925-sorted-indexes
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/ignite-14925-sorted-indexes by this push:
new 99fe791 IGNITE-14830 Introduce sorted index prototype (#518)
99fe791 is described below
commit 99fe79148b62652cd9d19999be55a13ef491b90f
Author: Taras Ledkov <tl...@gridgain.com>
AuthorDate: Mon Feb 7 19:45:22 2022 +0300
IGNITE-14830 Introduce sorted index prototype (#518)
---
.../{Producer.java => AbstractProducer.java} | 28 +-
.../org/apache/ignite/internal/manager/Event.java | 2 +-
.../ignite/internal/manager/EventParameters.java | 2 +-
.../apache/ignite/internal/manager/Producer.java | 49 +-
modules/index-api/pom.xml | 91 ++++
.../apache/ignite/internal/idx/IndexManager.java | 105 +++++
.../ignite/internal/idx/InternalSortedIndex.java | 73 +++
.../internal/idx/SortedIndexColumnCollation.java | 88 ++++
.../internal/idx/SortedIndexColumnDescriptor.java | 72 +++
.../ignite/internal/idx/SortedIndexDescriptor.java | 70 +++
.../ignite/internal/idx/event/IndexEvent.java} | 15 +-
.../internal/idx/event/IndexEventParameters.java | 81 ++++
modules/index/pom.xml | 135 ++++++
.../ignite/internal/idx/IndexManagerImpl.java | 513 +++++++++++++++++++++
.../internal/idx/InternalSortedIndexImpl.java | 313 +++++++++++++
modules/runner/pom.xml | 10 +
.../internal/runner/app/ItTablesApiTest.java | 7 +-
.../ignite/internal/sql/engine/ItIndexDdlTest.java | 112 +++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 20 +-
.../apache/ignite/internal/schema/BinaryRow.java | 6 +
.../ignite/internal/schema/ByteBufferRow.java | 20 +
.../org/apache/ignite/internal/schema/Column.java | 10 +
.../builder/SortedIndexDefinitionBuilderImpl.java | 4 +-
.../org/apache/ignite/internal/schema/row/Row.java | 6 +
modules/sql-engine/pom.xml | 11 +
.../internal/sql/engine/SqlQueryProcessor.java | 139 +++++-
.../sql/engine/exec/AbstractIndexScan.java | 4 +-
.../sql/engine/exec/ExecutionServiceImpl.java | 7 +-
.../ignite/internal/sql/engine/exec/IndexScan.java | 145 ++++++
.../sql/engine/exec/LogicalRelImplementor.java | 39 +-
.../sql/engine/exec/RuntimeSortedIndex.java | 16 +-
.../sql/engine/exec/ddl/DdlCommandHandler.java | 45 +-
.../sql/engine/exec/rel/IndexSpoolNode.java | 2 +-
.../FilterSpoolMergeToSortedIndexSpoolRule.java | 3 +-
.../internal/sql/engine/schema/IgniteIndex.java | 56 ++-
.../internal/sql/engine/schema/IgniteSchema.java | 10 +
.../sql/engine/schema/IgniteTableImpl.java | 10 +
.../sql/engine/schema/SqlSchemaManagerImpl.java | 81 +++-
.../internal/sql/engine/trait/TraitUtils.java | 4 +-
.../internal/sql/engine/util/IndexConditions.java | 27 +-
.../ignite/internal/sql/engine/util/RexUtils.java | 40 +-
.../internal/sql/engine/StopCalciteModuleTest.java | 6 +-
.../sql/engine/exec/MockedStructuresTest.java | 52 +--
.../sql/engine/exec/RuntimeSortedIndexTest.java | 1 +
.../CorrelatedNestedLoopJoinPlannerTest.java | 15 +-
.../engine/planner/HashIndexSpoolPlannerTest.java | 19 +-
.../internal/sql/engine/planner/PlannerTest.java | 2 +-
.../planner/SortedIndexSpoolPlannerTest.java | 33 +-
modules/storage-api/pom.xml | 5 +
.../storage/engine/StorageRowListener.java} | 55 ++-
.../internal/storage/engine/TableStorage.java | 5 +-
.../ignite/internal/storage/index/IndexRow.java | 24 +-
.../internal/storage/index/IndexRowPrefix.java | 17 +-
.../storage/index/IndexSchemaDescriptor.java} | 23 +-
.../storage/index/SortedIndexDescriptor.java | 206 ---------
.../internal/storage/index/SortedIndexStorage.java | 19 +-
.../storage/rocksdb/RocksDbTableStorage.java | 34 +-
.../rocksdb/index/BinaryIndexRowDeserializer.java | 48 --
.../rocksdb/index/BinaryIndexRowFactory.java | 92 ----
.../rocksdb/index/BinaryIndexRowSerializer.java | 72 +++
.../storage/rocksdb/index/BinaryRowComparator.java | 13 +-
.../storage/rocksdb/index/IndexBinaryRow.java} | 18 +-
...BinaryIndexRow.java => IndexBinaryRowImpl.java} | 49 +-
.../rocksdb}/index/IndexRowDeserializer.java | 8 +-
.../storage/rocksdb/index/IndexRowImpl.java | 117 +++++
.../storage/rocksdb/index/IndexRowSerializer.java} | 12 +-
.../storage/rocksdb/index/PrefixComparator.java | 21 +-
.../rocksdb/index/RocksDbSortedIndexStorage.java | 53 +--
.../index/SortedIndexStorageDescriptor.java | 83 ++++
.../storage/rocksdb/index/IndexRowWrapper.java | 69 +--
.../index/RocksDbSortedIndexStorageTest.java | 290 ++++--------
.../storage/rocksdb/index/TestIndexRow.java} | 45 +-
.../ignite/internal/table/StorageRowListener.java} | 55 ++-
.../apache/ignite/internal/table/TableImpl.java | 27 +-
.../internal/table/TableStorageRowListener.java | 51 ++
.../internal/table/distributed/TableManager.java | 16 +-
.../distributed/storage/VersionedRowStore.java | 21 +
parent/pom.xml | 12 +
pom.xml | 2 +
79 files changed, 3121 insertions(+), 1040 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/AbstractProducer.java
similarity index 79%
copy from modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
copy to modules/core/src/main/java/org/apache/ignite/internal/manager/AbstractProducer.java
index 665b9d9..8a86a30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/AbstractProducer.java
@@ -26,38 +26,24 @@ import org.jetbrains.annotations.Nullable;
/**
* Interface which can produce its events.
*/
-public abstract class Producer<T extends Event, P extends EventParameters> {
+public abstract class AbstractProducer<T extends Event, P extends EventParameters> implements Producer<T, P> {
/** All listeners. */
private ConcurrentHashMap<T, ConcurrentLinkedQueue<EventListener<P>>> listeners = new ConcurrentHashMap<>();
- /**
- * Registers an event listener. When the event predicate returns true it would never invoke after, otherwise this predicate would
- * receive an event again.
- *
- * @param evt Event.
- * @param closure Closure.
- */
+ /** {@inheritDoc} */
+ @Override
public void listen(T evt, EventListener<P> closure) {
listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).offer(closure);
}
- /**
- * Removes a listener associated with the event.
- *
- * @param evt Event.
- * @param closure Closure.
- */
+ /** {@inheritDoc} */
+ @Override
public void removeListener(T evt, EventListener<P> closure) {
removeListener(evt, closure, null);
}
- /**
- * Removes a listener associated with the event.
- *
- * @param evt Event.
- * @param closure Closure.
- * @param cause The exception that was a cause which a listener is removed.
- */
+ /** {@inheritDoc} */
+ @Override
public void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause) {
if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure)) {
closure.remove(cause == null ? new ListenerRemovedException() : cause.getCause() == null ? cause : cause.getCause());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
index 9d7ca85..e335a59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.manager;
/**
* The event cas which is produced by event producer component.
*
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
+ * @see Producer#listen(Event, EventListener)
*/
public interface Event {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
index 0d230df..f06669a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/EventParameters.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.manager;
/**
* Event parameters. This type passed to the event listener.
*
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
+ * @see AbstractProducer#fireEvent(Event, EventParameters, Throwable)
*/
public interface EventParameters {
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
index 665b9d9..04b9e43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/manager/Producer.java
@@ -17,19 +17,13 @@
package org.apache.ignite.internal.manager;
-import java.util.Iterator;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.jetbrains.annotations.Nullable;
/**
* Interface which can produce its events.
*/
-public abstract class Producer<T extends Event, P extends EventParameters> {
- /** All listeners. */
- private ConcurrentHashMap<T, ConcurrentLinkedQueue<EventListener<P>>> listeners = new ConcurrentHashMap<>();
-
+public interface Producer<T extends Event, P extends EventParameters> {
/**
* Registers an event listener. When the event predicate returns true it would never invoke after, otherwise this predicate would
* receive an event again.
@@ -37,9 +31,7 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
* @param evt Event.
* @param closure Closure.
*/
- public void listen(T evt, EventListener<P> closure) {
- listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).offer(closure);
- }
+ void listen(T evt, EventListener<P> closure);
/**
* Removes a listener associated with the event.
@@ -47,9 +39,7 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
* @param evt Event.
* @param closure Closure.
*/
- public void removeListener(T evt, EventListener<P> closure) {
- removeListener(evt, closure, null);
- }
+ void removeListener(T evt, EventListener<P> closure);
/**
* Removes a listener associated with the event.
@@ -58,36 +48,5 @@ public abstract class Producer<T extends Event, P extends EventParameters> {
* @param closure Closure.
* @param cause The exception that was a cause which a listener is removed.
*/
- public void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause) {
- if (listeners.computeIfAbsent(evt, evtKey -> new ConcurrentLinkedQueue<>()).remove(closure)) {
- closure.remove(cause == null ? new ListenerRemovedException() : cause.getCause() == null ? cause : cause.getCause());
- }
- }
-
- /**
- * Notifies every listener that subscribed before.
- *
- * @param evt Event type.
- * @param params Event parameters.
- * @param err Exception when it was happened, or {@code null} otherwise.
- */
- protected void fireEvent(T evt, P params, Throwable err) {
- ConcurrentLinkedQueue<EventListener<P>> queue = listeners.get(evt);
-
- if (queue == null) {
- return;
- }
-
- EventListener<P> closure;
-
- Iterator<EventListener<P>> iter = queue.iterator();
-
- while (iter.hasNext()) {
- closure = iter.next();
-
- if (closure.notify(params, err)) {
- iter.remove();
- }
- }
- }
+ void removeListener(T evt, EventListener<P> closure, @Nullable IgniteInternalCheckedException cause);
}
diff --git a/modules/index-api/pom.xml b/modules/index-api/pom.xml
new file mode 100644
index 0000000..cab2544
--- /dev/null
+++ b/modules/index-api/pom.xml
@@ -0,0 +1,91 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-index-api</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-schema</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <annotationProcessorPaths>
+ <path>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration-annotation-processor</artifactId>
+ <version>${project.version}</version>
+ </path>
+ </annotationProcessorPaths>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/IndexManager.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/IndexManager.java
new file mode 100644
index 0000000..f1a1950
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/IndexManager.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import org.apache.ignite.configuration.schemas.table.TableIndexChange;
+import org.apache.ignite.internal.idx.event.IndexEvent;
+import org.apache.ignite.internal.idx.event.IndexEventParameters;
+import org.apache.ignite.internal.manager.Producer;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
+import org.apache.ignite.lang.NodeStoppingException;
+
+/**
+ * Internal index manager facade provides low-level methods for indexes operations.
+ */
+public interface IndexManager extends Producer<IndexEvent, IndexEventParameters> {
+ /**
+ * Creates a new index with the specified name.
+ *
+ * @param idxCanonicalName Index canonical name.
+ * @param tblCanonicalName Table canonical name.
+ * @param idxChange Index configuration.
+ * @return Index.
+ * @throws IndexAlreadyExistsException if the index exists.
+ */
+ InternalSortedIndex createIndex(
+ String idxCanonicalName,
+ String tblCanonicalName,
+ Consumer<TableIndexChange> idxChange
+ );
+
+ /**
+ * Create index asynchronously.
+ *
+ * @param idxCanonicalName Index canonical name.
+ * @param tblCanonicalName Table canonical name.
+ * @param idxChange Index configuration.
+ * @return Index future, that may be completed exceptionally with {@link IndexAlreadyExistsException} if the index exists.
+ */
+ CompletableFuture<InternalSortedIndex> createIndexAsync(
+ String idxCanonicalName,
+ String tblCanonicalName,
+ Consumer<TableIndexChange> idxChange
+ );
+
+ /**
+ * Drop index.
+ *
+ * @param idxCanonicalName Index canonical name.
+ * @throws IndexAlreadyExistsException if the index doesn't exist.
+ */
+ void dropIndex(String idxCanonicalName);
+
+ /**
+ * Drop index asynchronously.
+ *
+ * @param idxCanonicalName Index canonical name.
+ * @return Index future, that may be completed exceptionally with {@link IndexNotFoundException} if the index doesn't exist.
+ */
+ CompletableFuture<Void> dropIndexAsync(String idxCanonicalName);
+
+ /**
+ * Gets indexes of the table.
+ *
+ * @param tblId Table identifier to lookup indexes.
+ * @return Indexes of the table.
+ * @throws NodeStoppingException If an implementation stopped before the method was invoked.
+ */
+ List<InternalSortedIndex> indexes(UUID tblId);
+
+ /**
+ * Gets index of the table.
+ *
+ * @return Index of the table.
+ * @throws NodeStoppingException If an implementation stopped before the method was invoked.
+ */
+ InternalSortedIndex index(String idxCanonicalName);
+
+ /**
+ * Gets index of the table asynchronously.
+ *
+ * @return Index future.
+ * @throws NodeStoppingException If an implementation stopped before the method was invoked.
+ */
+ CompletableFuture<InternalSortedIndex> indexAsync(String idxCanonicalName);
+}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndex.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndex.java
new file mode 100644
index 0000000..ba9b54c
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndex.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.BitSet;
+import java.util.UUID;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.table.Tuple;
+
+/**
+ * Sorted index facade.
+ */
+public interface InternalSortedIndex {
+ /** Exclude lower bound. */
+ byte GREATER = 0;
+
+ /** Include lower bound. */
+ byte GREATER_OR_EQUAL = 1;
+
+ /** Exclude upper bound. */
+ byte LESS = 0;
+
+ /** Include upper bound. */
+ byte LESS_OR_EQUAL = 1 << 1;
+
+ /**
+ * Return index name.
+ *
+ * @return Index name.
+ */
+ String name();
+
+ /**
+ * Return indexed table.
+ *
+ * @return Indexed table identifier.
+ */
+ UUID tableId();
+
+ /**
+ * Return index descriptor.
+ */
+ SortedIndexDescriptor descriptor();
+
+ /**
+ * Return rows between lower and upper bounds. Fill results rows by fields specified at the projection set.
+ *
+ * @param low Lower bound of the scan.
+ * @param up Lower bound of the scan.
+ * @param scanBoundMask Scan bound mask (specify how to work with rows equals to the bounds: include or exclude).
+ */
+ Cursor<Tuple> scan(Tuple low, Tuple up, byte scanBoundMask, BitSet proj);
+
+ /**
+ * Drop index.
+ */
+ void drop();
+}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnCollation.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnCollation.java
new file mode 100644
index 0000000..59644b7
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnCollation.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.Objects;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Index column collation: direction and NULLs order that a column's value is ordered in.
+ */
+public class SortedIndexColumnCollation {
+ private final boolean asc;
+
+ private final boolean nullsFirst;
+
+ /**
+ * Constructor.
+ *
+ * @param asc {@code true} if this column is sorted in ascending order or {@code false} otherwise.
+ */
+ public SortedIndexColumnCollation(
+ boolean asc
+ ) {
+ this(asc, false);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param asc {@code true} if this column is sorted in ascending order or {@code false} otherwise.
+ * @param nullsFirst NULL direction. If the flag is {@code true} NULLs are placed before all values {@code false} otherwise.
+ */
+ public SortedIndexColumnCollation(
+ boolean asc,
+ boolean nullsFirst
+ ) {
+ this.asc = asc;
+ this.nullsFirst = nullsFirst;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SortedIndexColumnCollation that = (SortedIndexColumnCollation) o;
+ return asc == that.asc && nullsFirst == that.nullsFirst;
+ }
+
+ public boolean asc() {
+ return asc;
+ }
+
+ public boolean isNullsFirst() {
+ return nullsFirst;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(asc, nullsFirst);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ return S.toString(SortedIndexColumnCollation.class, this);
+ }
+}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnDescriptor.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnDescriptor.java
new file mode 100644
index 0000000..382aa1a
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexColumnDescriptor.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.tostring.S;
+
+/**
+ * Descriptor of a Sorted Index column (column name and column sort order).
+ */
+public class SortedIndexColumnDescriptor {
+ private final Column column;
+
+ private final SortedIndexColumnCollation collation;
+
+ public SortedIndexColumnDescriptor(Column column, boolean asc) {
+ this(column, new SortedIndexColumnCollation(asc));
+ }
+
+ public SortedIndexColumnDescriptor(Column column, SortedIndexColumnCollation collation) {
+ this.column = column;
+ this.collation = collation;
+ }
+
+ /**
+ * Returns a column descriptor.
+ */
+ public Column column() {
+ return column;
+ }
+
+ /**
+ * Returns column collation.
+ */
+ public SortedIndexColumnCollation collation() {
+ return collation;
+ }
+
+ /**
+ * Returns {@code true} if this column is sorted in ascending order or {@code false} otherwise.
+ */
+ public boolean asc() {
+ return collation.asc();
+ }
+
+ /**
+ * Returns {@code true} if this column can contain null values or {@code false} otherwise.
+ */
+ public boolean nullable() {
+ return column.nullable();
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexDescriptor.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexDescriptor.java
new file mode 100644
index 0000000..b400e10
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/SortedIndexDescriptor.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.List;
+import org.apache.ignite.internal.schema.Column;
+
+/**
+ * Descriptor for creating a Sorted Index Storage.
+ */
+public class SortedIndexDescriptor {
+ private final String name;
+
+ private final List<SortedIndexColumnDescriptor> columns;
+
+ private final Column[] pkColumns;
+
+ /**
+ * Creates an Index Descriptor from a given Table Configuration.
+ *
+ * @param name Index name.
+ * @param columns Indexed columns (configured columns + primary key columns).
+ * @param pkColumns Pk columns.
+ */
+ public SortedIndexDescriptor(
+ String name,
+ final List<SortedIndexColumnDescriptor> columns,
+ Column[] pkColumns) {
+ this.name = name;
+ this.pkColumns = pkColumns;
+
+ this.columns = columns;
+ }
+
+ /**
+ * Returns this index' name.
+ */
+ public String name() {
+ return name;
+ }
+
+ /**
+ * Returns the Column Descriptors that comprise a row of this index (indexed columns + primary key columns).
+ */
+ public List<SortedIndexColumnDescriptor> columns() {
+ return columns;
+ }
+
+ /**
+ * Returns primary key columns.
+ */
+ public Column[] pkColumns() {
+ return pkColumns;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEvent.java
similarity index 72%
copy from modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
copy to modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEvent.java
index 9d7ca85..11e998f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEvent.java
@@ -15,12 +15,17 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.idx.event;
+
+import org.apache.ignite.internal.manager.Event;
/**
- * The event cas which is produced by event producer component.
- *
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
+ * Index management events.
*/
-public interface Event {
+public enum IndexEvent implements Event {
+ /** This event is fired when an index was created. */
+ CREATE,
+
+ /** This event is fired when an index was dropped. */
+ DROP
}
diff --git a/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEventParameters.java b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEventParameters.java
new file mode 100644
index 0000000..3b286a5
--- /dev/null
+++ b/modules/index-api/src/main/java/org/apache/ignite/internal/idx/event/IndexEventParameters.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx.event;
+
+import org.apache.ignite.internal.idx.InternalSortedIndex;
+import org.apache.ignite.internal.manager.EventParameters;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Table event parameters. There are properties which associate with a concrete table.
+ */
+public class IndexEventParameters implements EventParameters {
+ /** Index name. */
+ private final String idxName;
+
+ /** Indexed table name. */
+ private final String tblName;
+
+ /** Table instance. */
+ private final InternalSortedIndex idx;
+
+ /**
+ * Constructor.
+ *
+ * @param tblName Table name.
+ * @param idx Index instance.
+ */
+ public IndexEventParameters(String tblName, InternalSortedIndex idx) {
+ this(idx.name(), tblName, idx);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param idxName Index name.
+ * @param tblName Table name.
+ */
+ public IndexEventParameters(String idxName, String tblName) {
+ this(idxName, tblName, null);
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param idxName Index name.
+ * @param tblName Table name.
+ * @param idx Index.
+ */
+ private IndexEventParameters(String idxName, String tblName, @Nullable InternalSortedIndex idx) {
+ this.idxName = idxName;
+ this.tblName = tblName;
+ this.idx = idx;
+ }
+
+ public String indexName() {
+ return idxName;
+ }
+
+ public String tableName() {
+ return tblName;
+ }
+
+ public InternalSortedIndex index() {
+ return idx;
+ }
+}
diff --git a/modules/index/pom.xml b/modules/index/pom.xml
new file mode 100644
index 0000000..6314307
--- /dev/null
+++ b/modules/index/pom.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-parent</artifactId>
+ <version>1</version>
+ <relativePath>../../parent/pom.xml</relativePath>
+ </parent>
+
+ <artifactId>ignite-index</artifactId>
+ <version>3.0.0-SNAPSHOT</version>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-transactions</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-index-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-schema</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-storage-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-table</artifactId>
+ </dependency>
+
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-api</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-configuration</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Logging in tests -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-jdk14</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ </plugins>
+ </build>
+</project>
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/idx/IndexManagerImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/idx/IndexManagerImpl.java
new file mode 100644
index 0000000..ebada67
--- /dev/null
+++ b/modules/index/src/main/java/org/apache/ignite/internal/idx/IndexManagerImpl.java
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import static org.apache.ignite.internal.configuration.util.ConfigurationUtil.directProxy;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.configuration.ConfigurationChangeException;
+import org.apache.ignite.configuration.NamedListView;
+import org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
+import org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
+import org.apache.ignite.configuration.schemas.table.IndexColumnView;
+import org.apache.ignite.configuration.schemas.table.SortedIndexView;
+import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.configuration.schemas.table.TableIndexChange;
+import org.apache.ignite.configuration.schemas.table.TableIndexView;
+import org.apache.ignite.configuration.schemas.table.TableView;
+import org.apache.ignite.configuration.schemas.table.TablesConfiguration;
+import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguration;
+import org.apache.ignite.internal.idx.event.IndexEvent;
+import org.apache.ignite.internal.idx.event.IndexEventParameters;
+import org.apache.ignite.internal.manager.AbstractProducer;
+import org.apache.ignite.internal.manager.EventListener;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.distributed.TableManager;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
+import org.apache.ignite.lang.NodeStoppingException;
+import org.apache.ignite.lang.TableNotFoundException;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Internal index manager facade provides low-level methods for indexes operations.
+ */
+public class IndexManagerImpl extends AbstractProducer<IndexEvent, IndexEventParameters> implements IndexManager, IgniteComponent {
+ private static final IgniteLogger LOG = IgniteLogger.forClass(IndexManagerImpl.class);
+
+ private final TablesConfiguration tablesCfg;
+
+ private final TableManager tblMgr;
+
+ /** Indexes by canonical name. */
+ private final Map<String, InternalSortedIndex> idxsByName = new ConcurrentHashMap<>();
+
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ /** Prevents double stopping the component. */
+ private final AtomicBoolean stopGuard = new AtomicBoolean();
+
+ /**
+ * Constructor.
+ *
+ * @param tablesCfg Tables configuration.
+ */
+ public IndexManagerImpl(
+ TableManager tblMgr,
+ TablesConfiguration tablesCfg
+ ) {
+ this.tblMgr = tblMgr;
+ this.tablesCfg = tablesCfg;
+ }
+
+ @Override
+ public void start() {
+ tablesCfg.tables().any().indices().listenElements(
+ new ConfigurationNamedListListener<>() {
+ @Override
+ public @NotNull CompletableFuture<?> onCreate(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
+ TableConfiguration tbl = ctx.config(TableConfiguration.class);
+ String tblName = tbl.name().value();
+ String idxName = ctx.newValue().name();
+
+ if (!busyLock.enterBusy()) {
+ fireEvent(IndexEvent.CREATE,
+ new IndexEventParameters(
+ idxName,
+ tblName),
+ new NodeStoppingException());
+
+ return CompletableFuture.completedFuture(new NodeStoppingException());
+ }
+ try {
+ onIndexCreate(ctx);
+ } catch (Exception e) {
+ fireEvent(IndexEvent.CREATE, new IndexEventParameters(idxName, tblName), e);
+
+ LOG.error("Internal error, index creation failed [name={}, table={}]", e, idxName, tblName);
+
+ return CompletableFuture.completedFuture(e);
+ } finally {
+ busyLock.leaveBusy();
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public @NotNull CompletableFuture<?> onRename(@NotNull String oldName, @NotNull String newName,
+ @NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
+ // TODO: IGNITE-16196 Supports index rename
+ return ConfigurationNamedListListener.super.onRename(oldName, newName, ctx);
+ }
+
+ @Override
+ public @NotNull CompletableFuture<?> onDelete(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
+ TableConfiguration tbl = ctx.config(TableConfiguration.class);
+ String tblName = tbl.name().value();
+ String idxName = ctx.oldValue().name();
+
+ if (!busyLock.enterBusy()) {
+ fireEvent(IndexEvent.DROP,
+ new IndexEventParameters(
+ idxName,
+ tblName
+ ),
+ new NodeStoppingException());
+
+ return CompletableFuture.completedFuture(new NodeStoppingException());
+ }
+ try {
+ onIndexDrop(ctx);
+ } finally {
+ busyLock.leaveBusy();
+ }
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public @NotNull CompletableFuture<?> onUpdate(@NotNull ConfigurationNotificationEvent<TableIndexView> ctx) {
+ assert false : "Index cannot be updated [ctx=" + ctx + ']';
+
+ return CompletableFuture.completedFuture(null);
+ }
+ });
+ }
+
+ private void onIndexDrop(ConfigurationNotificationEvent<TableIndexView> ctx) {
+ TableConfiguration tbl = ctx.config(TableConfiguration.class);
+ String tblName = tbl.name().value();
+
+ InternalSortedIndex idx = idxsByName.remove(ctx.oldValue().name());
+
+ idx.drop();
+
+ fireEvent(IndexEvent.DROP, new IndexEventParameters(
+ idx.name(),
+ tblName
+ ), null);
+ }
+
+ private void onIndexCreate(ConfigurationNotificationEvent<TableIndexView> ctx) throws NodeStoppingException {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-15916
+ assert ctx.newValue() instanceof SortedIndexView : "Unsupported index type: " + ctx.newValue();
+
+ ExtendedTableConfiguration tblCfg = ctx.config(TableConfiguration.class);
+
+ TableImpl tbl = tblMgr.table(tblCfg.id().value());
+
+ createIndexLocally(tbl, (SortedIndexView) ctx.newValue());
+ }
+
+ @Override
+ public void stop() throws Exception {
+ if (!stopGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+ }
+
+ @Override
+ public List<InternalSortedIndex> indexes(UUID tblId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public InternalSortedIndex createIndex(
+ String idxCanonicalName,
+ String tblCanonicalName,
+ Consumer<TableIndexChange> idxChange
+ ) {
+ return join(createIndexAsync(idxCanonicalName, tblCanonicalName, idxChange));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void dropIndex(
+ String idxCanonicalName
+ ) {
+ join(dropIndexAsync(idxCanonicalName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<Void> dropIndexAsync(String idxCanonicalName) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+ try {
+ CompletableFuture<Void> idxFut = new CompletableFuture<>();
+
+ indexAsyncInternal(idxCanonicalName).thenAccept((idx) -> {
+ if (idx == null) {
+ idxFut.completeExceptionally(new IndexNotFoundException(idxCanonicalName));
+ return;
+ }
+
+ try {
+ CompletableFuture<TableImpl> tblFut = tblMgr.tableAsync(idx.tableId());
+
+ tblFut.thenAccept(tbl -> {
+ tablesCfg.tables().change(ch -> ch.createOrUpdate(
+ tbl.name(),
+ tblCh -> tblCh.changeIndices(idxes -> idxes.delete(idxCanonicalName))
+ ))
+ .whenComplete((res, t) -> {
+ if (t != null) {
+ Throwable ex = getRootCause(t);
+
+ if (!(ex instanceof IndexNotFoundException)) {
+ LOG.error("Index wasn't dropped [name={}]", ex, idxCanonicalName);
+ }
+
+ idxFut.completeExceptionally(ex);
+ } else {
+ idxFut.complete(null);
+ }
+ });
+ });
+ } catch (NodeStoppingException ex) {
+ idxFut.completeExceptionally(ex);
+ }
+ });
+
+ return idxFut;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<InternalSortedIndex> createIndexAsync(
+ String idxCanonicalName,
+ String tblCanonicalName,
+ Consumer<TableIndexChange> idxChange
+ ) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+ try {
+ CompletableFuture<InternalSortedIndex> idxFut = new CompletableFuture<>();
+
+ tblMgr.tableAsync(tblCanonicalName).thenAccept((tbl) -> {
+ if (tbl == null) {
+ idxFut.completeExceptionally(new TableNotFoundException(tblCanonicalName));
+ }
+
+ indexAsyncInternal(idxCanonicalName).thenAccept((idx) -> {
+ if (idx != null) {
+ idxFut.completeExceptionally(new IndexAlreadyExistsException(idxCanonicalName));
+ }
+
+ tblMgr.alterTableAsync(tblCanonicalName, chng -> chng.changeIndices(idxes -> {
+ if (idxes.get(idxCanonicalName) != null) {
+ idxFut.completeExceptionally(new IndexAlreadyExistsException(idxCanonicalName));
+ }
+
+ idxes.create(idxCanonicalName, idxChange::accept);
+ }
+ )).whenComplete((res, t) -> {
+ if (t != null) {
+ Throwable ex = getRootCause(t);
+
+ if (!(ex instanceof IndexAlreadyExistsException)) {
+ LOG.error("Index wasn't created [name={}]", ex, idxCanonicalName);
+ }
+
+ idxFut.completeExceptionally(ex);
+ } else {
+ idxFut.complete(idxsByName.get(idxCanonicalName));
+ }
+ });
+ });
+ });
+
+ return idxFut;
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public InternalSortedIndex index(String idxCanonicalName) {
+ return join(indexAsync(idxCanonicalName));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public CompletableFuture<InternalSortedIndex> indexAsync(String idxCanonicalName) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+ try {
+ return indexAsyncInternal(idxCanonicalName);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ private CompletableFuture<InternalSortedIndex> indexAsyncInternal(String idxCanonicalName) {
+ var idx = idxsByName.get(idxCanonicalName);
+
+ if (idx != null) {
+ return CompletableFuture.completedFuture(idx);
+ }
+
+ CompletableFuture<InternalSortedIndex> getIdxFut = new CompletableFuture<>();
+
+ EventListener<IndexEventParameters> clo = new EventListener<>() {
+ @Override
+ public boolean notify(@NotNull IndexEventParameters parameters, @Nullable Throwable e) {
+ if (!idxCanonicalName.equals(parameters.indexName())) {
+ return false;
+ }
+
+ if (e == null) {
+ getIdxFut.complete(parameters.index());
+ } else {
+ getIdxFut.completeExceptionally(e);
+ }
+
+ return true;
+ }
+
+ @Override
+ public void remove(@NotNull Throwable e) {
+ getIdxFut.completeExceptionally(e);
+ }
+ };
+
+ listen(IndexEvent.CREATE, clo);
+
+ idx = idxsByName.get(idxCanonicalName);
+
+ if (idx != null && getIdxFut.complete(idx) || !isIndexConfigured(idxCanonicalName) && getIdxFut.complete(null)) {
+ removeListener(IndexEvent.CREATE, clo, null);
+ } else {
+ getIdxFut.thenRun(() -> removeListener(IndexEvent.CREATE, clo, null));
+ }
+
+ return getIdxFut;
+ }
+
+ /**
+ * Checks that the index is configured with specific name.
+ *
+ * @param idxName Index name.
+ * @return True when the table is configured into cluster, false otherwise.
+ */
+ private boolean isIndexConfigured(String idxName) {
+ NamedListView<TableView> directTablesCfg = directProxy(tablesCfg.tables()).value();
+
+ // TODO: IGNITE-15721 Need to review this approach after the ticket would be fixed.
+ // Probably, it won't be required getting configuration of all tables from Metastore.
+ for (String name : directTablesCfg.namedListKeys()) {
+ TableView tableView = directTablesCfg.get(name);
+
+ if (tableView != null) {
+ if (tableView.indices().get(idxName) != null) {
+ return true;
+ }
+ }
+ }
+
+ return false;
+ }
+
+
+ /**
+ * Waits for future result and return, or unwraps {@link CompletionException} to {@link IgniteException} if failed.
+ *
+ * @param future Completable future.
+ * @return Future result.
+ */
+ private <T> T join(CompletableFuture<T> future) {
+ if (!busyLock.enterBusy()) {
+ throw new IgniteException(new NodeStoppingException());
+ }
+
+ try {
+ return future.join();
+ } catch (CompletionException ex) {
+ throw convertThrowable(ex.getCause());
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
+ /**
+ * Convert to public throwable.
+ *
+ * @param th Throwable.
+ * @return Public throwable.
+ */
+ private RuntimeException convertThrowable(Throwable th) {
+ if (th instanceof RuntimeException) {
+ return (RuntimeException) th;
+ }
+
+ return new IgniteException(th);
+ }
+
+ /**
+ * Gets a cause exception for a client.
+ *
+ * @param t Exception wrapper.
+ * @return A root exception which will be acceptable to throw for public API.
+ */
+ //TODO: IGNITE-16051 Implement exception converter for public API.
+ private @NotNull IgniteException getRootCause(Throwable t) {
+ Throwable ex;
+
+ if (t instanceof CompletionException) {
+ if (t.getCause() instanceof ConfigurationChangeException) {
+ ex = t.getCause().getCause();
+ } else {
+ ex = t.getCause();
+ }
+ } else {
+ ex = t;
+ }
+
+ return ex instanceof IgniteException ? (IgniteException) ex : new IgniteException(ex);
+ }
+
+ private void createIndexLocally(TableImpl tbl, SortedIndexView idxView) {
+ SchemaDescriptor tblSchema = tbl.schemaView().schema();
+
+ // Map<String, Column> cols = Stream.concat(
+ // Arrays.stream(tblSchema.keyColumns().columns()),
+ // Arrays.stream(tblSchema.valueColumns().columns())
+ // ).collect(Collectors.toMap(Column::name, Function.identity()));
+
+ List<SortedIndexColumnDescriptor> idxCols = Stream.concat(
+ idxView.columns().namedListKeys().stream(),
+ Arrays.stream(tblSchema.keyColumns().columns())
+ .map(Column::name)
+ )
+ .distinct()
+ .map(colName -> {
+ IndexColumnView idxCol = idxView.columns().get(colName);
+ return new SortedIndexColumnDescriptor(
+ tblSchema.column(colName),
+ new SortedIndexColumnCollation(idxCol == null || idxCol.asc()));
+ })
+ .collect(Collectors.toList());
+
+ SortedIndexDescriptor idxDesc = new SortedIndexDescriptor(
+ idxView.name(),
+ idxCols,
+ tblSchema.keyColumns().columns()
+ );
+
+ InternalSortedIndexImpl idx = new InternalSortedIndexImpl(
+ idxView.name(),
+ tbl.internalTable().storage().createSortedIndex(idxDesc),
+ tbl
+ );
+
+ tbl.addRowListener(idx);
+
+ idxsByName.put(idx.name(), idx);
+
+ fireEvent(IndexEvent.CREATE, new IndexEventParameters(tbl.name(), idx), null);
+ }
+}
diff --git a/modules/index/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndexImpl.java b/modules/index/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndexImpl.java
new file mode 100644
index 0000000..b7c598c
--- /dev/null
+++ b/modules/index/src/main/java/org/apache/ignite/internal/idx/InternalSortedIndexImpl.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.idx;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.storage.index.IndexRowPrefix;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.internal.table.StorageRowListener;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableRow;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Internal index manager facade provides low-level methods for indexes operations.
+ */
+public class InternalSortedIndexImpl implements InternalSortedIndex, StorageRowListener {
+ private final String name;
+
+ private final TableImpl tbl;
+
+ private final SortedIndexStorage store;
+
+ private final SortedIndexDescriptor desc;
+
+ /**
+ * Create sorted index.
+ */
+ public InternalSortedIndexImpl(String name, SortedIndexStorage store, TableImpl tbl) {
+ this.name = name;
+ this.store = store;
+ this.tbl = tbl;
+
+ desc = store.indexDescriptor();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UUID tableId() {
+ return tbl.tableId();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public SortedIndexDescriptor descriptor() {
+ return desc;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Cursor<Tuple> scan(Tuple low, Tuple up, byte scanBoundMask, BitSet proj) {
+ Cursor<IndexRow> cur = store.range(
+ low != null ? new IndexRowPrefixTuple(low) {
+ } : null,
+ up != null ? new IndexRowPrefixTuple(up) : null,
+ r -> true
+ );
+
+ List<String> tblColIds = new ArrayList<>(tbl.schemaView().schema().columnNames());
+ Set<String> idxColIds = desc.columns().stream().map(SortedIndexColumnDescriptor::column).map(Column::name)
+ .collect(Collectors.toSet());
+
+ boolean needLookup = proj.stream().anyMatch(order -> !idxColIds.contains(tblColIds.get(order)));
+
+ final TupleFactory tupleFactory = new TupleFactory(tbl.schemaView().lastSchemaVersion());
+
+ return new IndexCursor(
+ cur,
+ needLookup ? r -> convertWithTableLookup(r, tupleFactory) : r -> convertIndexedOnly(r, tupleFactory)
+ );
+ }
+
+ @Override
+ public void drop() {
+ tbl.internalTable().storage().dropIndex(name);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId) {
+ Tuple tupleNew = TableRow.tuple(tbl.schemaView().resolve(newRow));
+
+ store.put(new IndexRowTuple(tupleNew, newRow.keyRow(), partId));
+
+ if (oldRow != null) {
+ Tuple tupleOld = TableRow.tuple(tbl.schemaView().resolve(oldRow));
+
+ store.remove(new IndexRowTuple(tupleOld, oldRow.keyRow(), partId));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onRemove(BinaryRow row, int partId) {
+ Tuple t = TableRow.tuple(tbl.schemaView().resolve(row));
+
+ store.remove(new IndexRowTuple(t, row.keyRow(), partId));
+ }
+
+ /**
+ * Used for index only scan.
+ */
+ private Tuple convertIndexedOnly(IndexRow r, TupleFactory tupleFactory) {
+ Tuple t = tupleFactory.create();
+
+ for (int i = 0; i < desc.columns().size(); ++i) {
+ t.set(desc.columns().get(i).column().name(), r.value(i));
+ }
+
+ return t;
+ }
+
+ /**
+ * Additional lookup full row at the table by PK.
+ */
+ private Tuple convertWithTableLookup(IndexRow r, TupleFactory tupleFactory) {
+ try {
+ BinaryRow tblRow = tbl.internalTable().get(r.primaryKey(), null).get();
+
+ Tuple tblTuple = TableRow.tuple(tbl.schemaView().resolve(tblRow));
+
+ Tuple res = tupleFactory.create();
+
+ for (int i = 0; i < res.columnCount(); ++i) {
+ res.set(res.columnName(i), tblTuple.value(res.columnName(i)));
+ }
+
+ return res;
+ } catch (Exception e) {
+ throw new IgniteInternalException("Error on row lookup by index PK "
+ + "[index=" + name + ", indexRow=" + r + ']', e);
+ }
+ }
+
+ /** Creates table tuple on specified schema. */
+ private class TupleFactory {
+ private final List<String> cols;
+
+ TupleFactory(int tblSchemaVersion) {
+ cols = Stream.concat(
+ Arrays.stream(tbl.schemaView().schema(tblSchemaVersion).keyColumns().columns()),
+ Arrays.stream(tbl.schemaView().schema(tblSchemaVersion).valueColumns().columns())
+ )
+ .sorted(Comparator.comparing(Column::columnOrder))
+ .map(Column::name)
+ .collect(Collectors.toList());
+ }
+
+ Tuple create() {
+ Tuple t = Tuple.create();
+
+ for (String colName : cols) {
+ t.set(colName, null);
+ }
+
+ return t;
+ }
+ }
+
+ private class IndexRowTuple implements IndexRow {
+ private final Tuple tuple;
+
+ private final BinaryRow pk;
+
+ private final int part;
+
+ IndexRowTuple(Tuple t, BinaryRow pk, int part) {
+ this.tuple = t;
+ this.pk = pk;
+ this.part = part;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BinaryRow primaryKey() {
+ return pk;
+ }
+
+ @Override
+ public int partition() {
+ return part;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Object value(int idxColOrder) {
+ return tuple.value(desc.columns().get(idxColOrder).column().name());
+ }
+ }
+
+ private static class IndexRowPrefixTuple implements IndexRowPrefix {
+ private final Tuple tuple;
+
+ IndexRowPrefixTuple(Tuple t) {
+ this.tuple = t;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Object value(int idxColOrder) {
+ return tuple.value(idxColOrder);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int length() {
+ return tuple.columnCount();
+ }
+ }
+
+ /**
+ * Index store cursor adapter.
+ */
+ private static class IndexCursor implements Cursor<Tuple> {
+ private final Cursor<IndexRow> cur;
+ private final IndexIterator it;
+
+ IndexCursor(Cursor<IndexRow> cur, Function<IndexRow, Tuple> rowConverter) {
+ this.cur = cur;
+ it = new IndexIterator(cur.iterator(), rowConverter);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() throws Exception {
+ cur.close();
+ }
+
+ /** {@inheritDoc} */
+ @NotNull
+ @Override
+ public Iterator<Tuple> iterator() {
+ return it;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Tuple next() {
+ return it.next();
+ }
+ }
+
+ /**
+ * Index store iterator adapter.
+ */
+ private static class IndexIterator implements Iterator<Tuple> {
+ private final Iterator<IndexRow> it;
+
+ private final Function<IndexRow, Tuple> rowConverter;
+
+ private IndexIterator(Iterator<IndexRow> it, Function<IndexRow, Tuple> rowConverter) {
+ this.it = it;
+ this.rowConverter = rowConverter;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ return it.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Tuple next() {
+ IndexRow r = it.next();
+
+ return rowConverter.apply(r);
+ }
+ }
+}
diff --git a/modules/runner/pom.xml b/modules/runner/pom.xml
index a5aa970..10b5735 100644
--- a/modules/runner/pom.xml
+++ b/modules/runner/pom.xml
@@ -95,6 +95,16 @@
</dependency>
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-index-api</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-index</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>info.picocli</groupId>
<artifactId>picocli</artifactId>
</dependency>
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
index 9d896ec..2ca14bc 100644
--- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItTablesApiTest.java
@@ -595,13 +595,14 @@ public class ItTablesApiTest extends IgniteAbstractTest {
* @param shortTableName Table name.
*/
protected void addIndex(Ignite node, String schemaName, String shortTableName) {
- IndexDefinition idx = SchemaBuilders.hashIndex("testHI")
- .withColumns("valInt", "valStr")
+ IndexDefinition idx = SchemaBuilders.sortedIndex("testHI")
+ .addIndexColumn("valInt").done()
+ .addIndexColumn("valStr").done()
.build();
node.tables().alterTable(schemaName + "." + shortTableName, chng -> chng.changeIndices(idxes -> {
if (idxes.get(idx.name()) != null) {
- log.info("Index already exists [naem={}]", idx.name());
+ log.info("Index already exists [name={}]", idx.name());
throw new IndexAlreadyExistsException(idx.name());
}
diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexDdlTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexDdlTest.java
new file mode 100644
index 0000000..45315eb
--- /dev/null
+++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItIndexDdlTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine;
+
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.sql.engine.util.QueryChecker.containsTableScan;
+import static org.hamcrest.Matchers.not;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.TableNotFoundException;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Group of tests that still has not been sorted out. It’s better to avoid extending this class with new tests.
+ */
+@Disabled("https://issues.apache.org/jira/browse/IGNITE-15655")
+public class ItIndexDdlTest extends AbstractBasicIntegrationTest {
+ @Test
+ public void indexBasic() {
+ sql("create table test_tbl (id int primary key, val0 int, val1 varchar, val2 int)");
+
+ sql("create index TEST_IDX on test_tbl (val1, val0)");
+
+ insertData(
+ "PUBLIC.TEST_TBL",
+ new String[] {"ID", "VAL0", "VAL1", "VAL2"},
+ new Object[] {0, 1, "val0", 0},
+ new Object[] {1, 2, "val1", 1},
+ new Object[] {2, 3, "val2", 2},
+ new Object[] {3, null, "val3", 3}
+ );
+
+ // Scan index only
+ assertQuery("SELECT VAL0, ID FROM test_tbl WHERE val0 > 1 and val1 > 'val' ORDER BY val1")
+ .matches(containsIndexScan("PUBLIC", "TEST_TBL", "TEST_IDX"))
+ .ordered()
+ .returns(2, 1)
+ .returns(3, 2)
+ .check();
+
+ // Scan index with lookup rows at the table
+ assertQuery("SELECT * FROM test_tbl WHERE val0 > 1 and val1 > 'val' ORDER BY val1")
+ .ordered()
+ .matches(containsIndexScan("PUBLIC", "TEST_TBL", "TEST_IDX"))
+ .returns(1, 2, "val1", 1)
+ .returns(2, 3, "val2", 2)
+ .check();
+
+ sql("drop index TEST_IDX");
+
+ assertQuery("SELECT * FROM test_tbl WHERE val0 > 1 and val1 > 'val' ORDER BY val1")
+ .ordered()
+ .matches(not(containsIndexScan("PUBLIC", "TEST_TBL", "TEST_IDX")))
+ .matches(containsTableScan("PUBLIC", "TEST_TBL"))
+ .returns(1, 2, "val1", 1)
+ .returns(2, 3, "val2", 2)
+ .check();
+ }
+
+ /**
+ * Tests create /drop index through DDL.
+ */
+ @Test
+ public void createDropIndex() {
+ String tblName = "createDropIdx";
+
+ String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions=1", tblName);
+
+ sql(newTblSql);
+
+ sql(String.format("CREATE INDEX index1 ON %s (c1)", tblName));
+
+ sql(String.format("CREATE INDEX IF NOT EXISTS index1 ON %s (c1)", tblName));
+
+ sql(String.format("CREATE INDEX index2 ON %s (c1)", tblName));
+
+ sql(String.format("CREATE INDEX index3 ON %s (c2)", tblName));
+
+ assertThrows(IndexAlreadyExistsException.class, () ->
+ sql(String.format("CREATE INDEX index3 ON %s (c1)", tblName)));
+
+ assertThrows(TableNotFoundException.class, () ->
+ sql(String.format("CREATE INDEX index_3 ON %s (c1)", tblName + "_nonExist")));
+
+ sql(String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", tblName));
+
+ sql(String.format("DROP INDEX index4"));
+
+ sql(String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", tblName));
+
+ sql(String.format("DROP INDEX index4"));
+
+ sql(String.format("DROP INDEX IF EXISTS index4"));
+ }
+}
diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0a5afe3..cdf2ec1 100644
--- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -41,6 +41,8 @@ import org.apache.ignite.internal.configuration.ConfigurationRegistry;
import org.apache.ignite.internal.configuration.ServiceLoaderModulesProvider;
import org.apache.ignite.internal.configuration.storage.DistributedConfigurationStorage;
import org.apache.ignite.internal.configuration.storage.LocalConfigurationStorage;
+import org.apache.ignite.internal.idx.IndexManager;
+import org.apache.ignite.internal.idx.IndexManagerImpl;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
@@ -131,6 +133,9 @@ public class IgniteImpl implements Ignite {
/** Distributed table manager. */
private final TableManager distributedTblMgr;
+ /** Index manager. */
+ private final IndexManagerImpl idxManager;
+
/** Rest module. */
private final RestModule restModule;
@@ -221,9 +226,15 @@ public class IgniteImpl implements Ignite {
txManager
);
+ idxManager = new IndexManagerImpl(
+ distributedTblMgr,
+ clusterCfgMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY)
+ );
+
qryEngine = new SqlQueryProcessor(
clusterSvc,
- distributedTblMgr
+ distributedTblMgr,
+ idxManager
);
restModule = new RestModule(nodeCfgMgr, clusterCfgMgr, nettyBootstrapFactory);
@@ -318,6 +329,7 @@ public class IgniteImpl implements Ignite {
clusterCfgMgr,
baselineMgr,
distributedTblMgr,
+ idxManager,
qryEngine,
restModule,
clientHandlerModule
@@ -352,7 +364,7 @@ public class IgniteImpl implements Ignite {
public void stop() {
if (status.getAndSet(Status.STOPPING) == Status.STARTED) {
doStopNode(List.of(vaultMgr, nodeCfgMgr, clusterSvc, raftMgr, txManager, metaStorageMgr, clusterCfgMgr, baselineMgr,
- distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
+ idxManager, distributedTblMgr, qryEngine, restModule, clientHandlerModule, nettyBootstrapFactory));
}
}
@@ -366,6 +378,10 @@ public class IgniteImpl implements Ignite {
return qryEngine;
}
+ public IndexManager indexManager() {
+ return idxManager;
+ }
+
/** {@inheritDoc} */
@Override
public IgniteTransactions transactions() {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index 985d92e..7dce744 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -74,6 +74,12 @@ public interface BinaryRow {
ByteBuffer valueSlice();
/**
+ * Get Binary row contains slice representing the key-only fields.
+ * Is used to lookup full row at a table by the key.
+ */
+ BinaryRow keyRow();
+
+ /**
* Writes binary row to given stream.
*
* @param stream Stream to write to.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index ca41aa2..6965227 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -173,6 +173,26 @@ public class ByteBufferRow implements BinaryRow {
}
/** {@inheritDoc} */
+ @Override
+ public BinaryRow keyRow() {
+ final int off = KEY_CHUNK_OFFSET;
+ final int len = readInteger(off);
+
+ try {
+ ByteBuffer res = ByteBuffer.wrap(buf.limit(off + len).position(0).slice().array());
+
+ res.putShort(FLAGS_FIELD_OFFSET, (short) (res.getShort(FLAGS_FIELD_OFFSET) | RowFlags.NO_VALUE_FLAG));
+ res.putShort(FLAGS_FIELD_OFFSET, (short) (res.getShort(FLAGS_FIELD_OFFSET) | RowFlags.NO_VALUE_FLAG));
+ res.order(ByteOrder.LITTLE_ENDIAN);
+
+ return new ByteBufferRow(res);
+ } finally {
+ buf.position(0); // Reset bounds.
+ buf.limit(buf.capacity());
+ }
+ }
+
+ /** {@inheritDoc} */
@Override public byte[] bytes() {
// TODO IGNITE-15934 avoid copy.
byte[] tmp = new byte[buf.limit()];
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
index 3bd724c..21b3ee2 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Column.java
@@ -251,6 +251,16 @@ public class Column {
return new Column(schemaIndex, columnOrder, name, type, nullable, defValSup);
}
+ /**
+ * Copy column with new order.
+ *
+ * @param order Column order.
+ * @return Column.
+ */
+ public Column copyWithOrder(int order) {
+ return new Column(-1, order, name, type, nullable, defValSup);
+ }
+
/** {@inheritDoc} */
@Override
public String toString() {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/builder/SortedIndexDefinitionBuilderImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/builder/SortedIndexDefinitionBuilderImpl.java
index 84500b5..8389e0a 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/builder/SortedIndexDefinitionBuilderImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/definition/builder/SortedIndexDefinitionBuilderImpl.java
@@ -17,7 +17,7 @@
package org.apache.ignite.internal.schema.definition.builder;
-import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -34,7 +34,7 @@ import org.apache.ignite.schema.definition.index.SortedIndexDefinition;
*/
public class SortedIndexDefinitionBuilderImpl extends AbstractIndexBuilder implements SortedIndexDefinitionBuilder {
/** Index columns. */
- protected final Map<String, SortedIndexColumnBuilderImpl> cols = new HashMap<>();
+ protected final Map<String, SortedIndexColumnBuilderImpl> cols = new LinkedHashMap<>();
/**
* Constructor.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 9ed3563..1b37772 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -837,6 +837,12 @@ public class Row implements BinaryRow, SchemaAware {
/** {@inheritDoc} */
@Override
+ public BinaryRow keyRow() {
+ return row.keyRow();
+ }
+
+ /** {@inheritDoc} */
+ @Override
public void writeTo(OutputStream stream) throws IOException {
row.writeTo(stream);
}
diff --git a/modules/sql-engine/pom.xml b/modules/sql-engine/pom.xml
index 8520a4e..d13b07c 100644
--- a/modules/sql-engine/pom.xml
+++ b/modules/sql-engine/pom.xml
@@ -53,6 +53,11 @@
</dependency>
<dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-index-api</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
@@ -184,6 +189,12 @@
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-index</artifactId>
+ <scope>test</scope>
+ </dependency>
+
<!-- Logging in tests -->
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 88ed145..001d085 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -25,6 +25,9 @@ import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.util.Pair;
+import org.apache.ignite.internal.idx.IndexManager;
+import org.apache.ignite.internal.idx.event.IndexEvent;
+import org.apache.ignite.internal.idx.event.IndexEventParameters;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.sql.engine.exec.ArrayRowHandler;
import org.apache.ignite.internal.sql.engine.exec.ExecutionService;
@@ -58,7 +61,9 @@ public class SqlQueryProcessor implements QueryProcessor {
private final ClusterService clusterSrvc;
- private final TableManager tableManager;
+ private final TableManager tblManager;
+
+ private final IndexManager idxManager;
/** Busy lock for stop synchronisation. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
@@ -66,8 +71,11 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Keeps queries plans to avoid expensive planning of the same queries. */
private final QueryPlanCache planCache = new QueryPlanCacheImpl(PLAN_CACHE_SIZE);
- /** Event listeners to close. */
- private final List<Pair<TableEvent, EventListener<TableEventParameters>>> evtLsnrs = new ArrayList<>();
+ /** Table event listeners to close. */
+ private final List<Pair<TableEvent, EventListener<TableEventParameters>>> tblEvtLsnrs = new ArrayList<>();
+
+ /** Index event listeners to close. */
+ private final List<Pair<IndexEvent, EventListener<IndexEventParameters>>> idxEvtLsnrs = new ArrayList<>();
private volatile ExecutionService executionSrvc;
@@ -77,12 +85,21 @@ public class SqlQueryProcessor implements QueryProcessor {
private volatile Map<String, SqlExtension> extensions;
+ /**
+ * Create SQL query processor.
+ *
+ * @param clusterSrvc Cluster service.
+ * @param tblManager Table manager.
+ * @param idxManager Index manager.
+ */
public SqlQueryProcessor(
ClusterService clusterSrvc,
- TableManager tableManager
+ TableManager tblManager,
+ IndexManager idxManager
) {
this.clusterSrvc = clusterSrvc;
- this.tableManager = tableManager;
+ this.tblManager = tblManager;
+ this.idxManager = idxManager;
}
/** {@inheritDoc} */
@@ -106,14 +123,15 @@ public class SqlQueryProcessor implements QueryProcessor {
extensions = extensionList.stream().collect(Collectors.toMap(SqlExtension::name, Function.identity()));
- SqlSchemaManagerImpl schemaHolder = new SqlSchemaManagerImpl(tableManager, planCache::clear);
+ SqlSchemaManagerImpl schemaHolder = new SqlSchemaManagerImpl(tblManager, planCache::clear);
executionSrvc = new ExecutionServiceImpl<>(
clusterSrvc.topologyService(),
msgSrvc,
planCache,
schemaHolder,
- tableManager,
+ tblManager,
+ idxManager,
taskExecutor,
ArrayRowHandler.INSTANCE,
extensions
@@ -123,6 +141,9 @@ public class SqlQueryProcessor implements QueryProcessor {
registerTableListener(TableEvent.ALTER, new TableUpdatedListener(schemaHolder));
registerTableListener(TableEvent.DROP, new TableDroppedListener(schemaHolder));
+ registerIndexListener(IndexEvent.CREATE, new IndexCreatedListener(schemaHolder));
+ registerIndexListener(IndexEvent.DROP, new IndexDroppedListener(schemaHolder));
+
taskExecutor.start();
msgSrvc.start();
executionSrvc.start();
@@ -132,9 +153,15 @@ public class SqlQueryProcessor implements QueryProcessor {
}
private void registerTableListener(TableEvent evt, AbstractTableEventListener lsnr) {
- evtLsnrs.add(Pair.of(evt, lsnr));
+ tblEvtLsnrs.add(Pair.of(evt, lsnr));
+
+ tblManager.listen(evt, lsnr);
+ }
+
+ private void registerIndexListener(IndexEvent evt, AbstractIndexEventListener lsnr) {
+ idxEvtLsnrs.add(Pair.of(evt, lsnr));
- tableManager.listen(evt, lsnr);
+ idxManager.listen(evt, lsnr);
}
/** {@inheritDoc} */
@@ -160,8 +187,12 @@ public class SqlQueryProcessor implements QueryProcessor {
planCache::stop
);
- Stream<AutoCloseable> closableListeners = evtLsnrs.stream()
- .map((p) -> () -> tableManager.removeListener(p.left, p.right));
+ Stream<AutoCloseable> closableListeners = Stream.concat(
+ tblEvtLsnrs.stream()
+ .map((p) -> () -> tblManager.removeListener(p.left, p.right)),
+ idxEvtLsnrs.stream()
+ .map((p) -> () -> idxManager.removeListener(p.left, p.right))
+ );
toClose.addAll(
Stream.concat(closableComponents, closableListeners).collect(Collectors.toList())
@@ -187,9 +218,7 @@ public class SqlQueryProcessor implements QueryProcessor {
private abstract static class AbstractTableEventListener implements EventListener<TableEventParameters> {
protected final SqlSchemaManagerImpl schemaHolder;
- private AbstractTableEventListener(
- SqlSchemaManagerImpl schemaHolder
- ) {
+ private AbstractTableEventListener(SqlSchemaManagerImpl schemaHolder) {
this.schemaHolder = schemaHolder;
}
@@ -201,15 +230,17 @@ public class SqlQueryProcessor implements QueryProcessor {
}
private static class TableCreatedListener extends AbstractTableEventListener {
- private TableCreatedListener(
- SqlSchemaManagerImpl schemaHolder
- ) {
+ private TableCreatedListener(SqlSchemaManagerImpl schemaHolder) {
super(schemaHolder);
}
/** {@inheritDoc} */
@Override
public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
+ if (exception != null) {
+ return false;
+ }
+
schemaHolder.onTableCreated(
"PUBLIC",
parameters.table()
@@ -220,15 +251,17 @@ public class SqlQueryProcessor implements QueryProcessor {
}
private static class TableUpdatedListener extends AbstractTableEventListener {
- private TableUpdatedListener(
- SqlSchemaManagerImpl schemaHolder
- ) {
+ private TableUpdatedListener(SqlSchemaManagerImpl schemaHolder) {
super(schemaHolder);
}
/** {@inheritDoc} */
@Override
public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
+ if (exception != null) {
+ return false;
+ }
+
schemaHolder.onTableUpdated(
"PUBLIC",
parameters.table()
@@ -239,15 +272,17 @@ public class SqlQueryProcessor implements QueryProcessor {
}
private static class TableDroppedListener extends AbstractTableEventListener {
- private TableDroppedListener(
- SqlSchemaManagerImpl schemaHolder
- ) {
+ private TableDroppedListener(SqlSchemaManagerImpl schemaHolder) {
super(schemaHolder);
}
/** {@inheritDoc} */
@Override
public boolean notify(@NotNull TableEventParameters parameters, @Nullable Throwable exception) {
+ if (exception != null) {
+ return false;
+ }
+
schemaHolder.onTableDropped(
"PUBLIC",
parameters.tableName()
@@ -256,4 +291,62 @@ public class SqlQueryProcessor implements QueryProcessor {
return false;
}
}
+
+ private abstract static class AbstractIndexEventListener implements EventListener<IndexEventParameters> {
+ protected final SqlSchemaManagerImpl schemaHolder;
+
+ private AbstractIndexEventListener(SqlSchemaManagerImpl schemaHolder) {
+ this.schemaHolder = schemaHolder;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void remove(@NotNull Throwable exception) {
+ // No-op.
+ }
+ }
+
+ private static class IndexCreatedListener extends AbstractIndexEventListener {
+ private IndexCreatedListener(SqlSchemaManagerImpl schemaHolder) {
+ super(schemaHolder);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) {
+ if (exception != null) {
+ return false;
+ }
+
+ schemaHolder.onIndexCreated(
+ "PUBLIC",
+ parameters.tableName(),
+ parameters.index()
+ );
+
+ return false;
+ }
+ }
+
+ private static class IndexDroppedListener extends AbstractIndexEventListener {
+ private IndexDroppedListener(SqlSchemaManagerImpl schemaHolder) {
+ super(schemaHolder);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean notify(@NotNull IndexEventParameters parameters, @Nullable Throwable exception) {
+ if (exception != null) {
+ return false;
+ }
+
+ schemaHolder.onIndexDropped(
+ "PUBLIC",
+ parameters.tableName(),
+ parameters.indexName()
+ );
+
+ return false;
+ }
+ }
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
index 028525a..783aa9f 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/AbstractIndexScan.java
@@ -87,7 +87,9 @@ public abstract class AbstractIndexScan<RowT, IdxRowT> implements Iterable<RowT>
this::indexRow2Row
);
- it = new FilteringIterator<>(it, filters);
+ if (filters != null) {
+ it = new FilteringIterator<>(it, filters);
+ }
if (rowTransformer != null) {
it = new TransformingIterator<>(it, rowTransformer);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
index 4ca2422..7b996f3 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ExecutionServiceImpl.java
@@ -49,6 +49,7 @@ import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.tools.Frameworks;
import org.apache.calcite.tools.ValidationException;
+import org.apache.ignite.internal.idx.IndexManager;
import org.apache.ignite.internal.sql.engine.ResultSetMetadata;
import org.apache.ignite.internal.sql.engine.SqlCursor;
import org.apache.ignite.internal.sql.engine.exec.ddl.DdlCommandHandler;
@@ -147,8 +148,6 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
private final Map<String, SqlExtension> extensions;
- private final TableManager tableManager;
-
private final DdlCommandHandler ddlCmdHnd;
/**
@@ -161,6 +160,7 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
QueryPlanCache planCache,
SqlSchemaManager sqlSchemaManager,
TableManager tblManager,
+ IndexManager idxManager,
QueryTaskExecutor taskExecutor,
RowHandler<RowT> handler,
Map<String, SqlExtension> extensions
@@ -171,9 +171,8 @@ public class ExecutionServiceImpl<RowT> implements ExecutionService {
this.sqlSchemaManager = sqlSchemaManager;
this.taskExecutor = taskExecutor;
this.extensions = extensions;
- tableManager = tblManager;
- ddlCmdHnd = new DdlCommandHandler(tableManager);
+ ddlCmdHnd = new DdlCommandHandler(tblManager, idxManager);
locNodeId = topSrvc.localMember().id();
qryPlanCache = planCache;
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/IndexScan.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/IndexScan.java
new file mode 100644
index 0000000..7497276
--- /dev/null
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/IndexScan.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec;
+
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.idx.InternalSortedIndex;
+import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
+import org.apache.ignite.internal.util.Cursor;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.Tuple;
+
+/**
+ * Scan on index.
+ */
+public class IndexScan<RowT> extends AbstractIndexScan<RowT, Tuple> {
+ private final RowFactory<RowT> tableRowFactory;
+
+ /**
+ * Creates index scan.
+ */
+ public IndexScan(
+ IgniteIndex idx,
+ ExecutionContext<RowT> ectx,
+ ColocationGroup colocationGrp,
+ Predicate<RowT> filters,
+ Supplier<RowT> lower,
+ Supplier<RowT> upper,
+ Function<RowT, RowT> rowTransformer,
+ ImmutableBitSet requiredColumns
+ ) {
+ super(
+ ectx,
+ idx.table().getRowType(ectx.getTypeFactory(), requiredColumns),
+ new TreeIndexWrapper(idx.index(), requiredColumns.toBitSet()),
+ filters,
+ lower,
+ upper,
+ rowTransformer
+ );
+
+ tableRowFactory = ectx.rowHandler().factory(ectx.getTypeFactory(), idx.table().getRowType(ectx.getTypeFactory()));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public synchronized Iterator<RowT> iterator() {
+ try {
+ return super.iterator();
+ } catch (Exception e) {
+ throw e;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected Tuple row2indexRow(RowT bound) {
+ if (bound == null) {
+ return null;
+ }
+
+ RowHandler<RowT> hnd = ectx.rowHandler();
+
+ Tuple t = Tuple.create();
+
+ for (int i = 0; i < hnd.columnCount(bound); ++i) {
+ t.set("idx" + i, hnd.get(i, bound));
+ }
+
+ return t;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected RowT indexRow2Row(Tuple t) {
+ RowT row = tableRowFactory.create();
+
+ RowHandler<RowT> hnd = ectx.rowHandler();
+
+ for (int i = 0; i < t.columnCount(); ++i) {
+ hnd.set(i, row, t.value(i));
+ }
+
+ return row;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close() {
+ }
+
+ /**
+ * Sorted index wrapper.
+ */
+ private static class TreeIndexWrapper implements TreeIndex<Tuple> {
+ /** Underlying index. */
+ private final InternalSortedIndex idx;
+
+ private final BitSet requiredColumns;
+
+ /**
+ * Creates wrapper for InternalSortedIndex.
+ */
+ private TreeIndexWrapper(InternalSortedIndex idx, BitSet requiredColumns) {
+ this.idx = idx;
+ this.requiredColumns = requiredColumns;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Cursor<Tuple> find(Tuple lower, Tuple upper) {
+ try {
+ return idx.scan(
+ lower,
+ upper,
+ (byte) (InternalSortedIndex.GREATER_OR_EQUAL | InternalSortedIndex.LESS_OR_EQUAL),
+ requiredColumns
+ );
+ } catch (Exception e) {
+ throw new IgniteException("Failed to find index rows", e);
+ }
+ }
+ }
+}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
index f107252..09076ec 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/LogicalRelImplementor.java
@@ -97,6 +97,7 @@ import org.apache.ignite.internal.sql.engine.rel.agg.IgniteReduceSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleHashAggregate;
import org.apache.ignite.internal.sql.engine.rel.agg.IgniteSingleSortAggregate;
import org.apache.ignite.internal.sql.engine.rel.set.IgniteSetOp;
+import org.apache.ignite.internal.sql.engine.schema.IgniteIndex;
import org.apache.ignite.internal.sql.engine.schema.InternalIgniteTable;
import org.apache.ignite.internal.sql.engine.trait.Destination;
import org.apache.ignite.internal.sql.engine.trait.IgniteDistribution;
@@ -284,30 +285,36 @@ public class LogicalRelImplementor<RowT> implements IgniteRelVisitor<Node<RowT>>
/** {@inheritDoc} */
@Override
public Node<RowT> visit(IgniteIndexScan rel) {
- // TODO: fix this
- // RexNode condition = rel.condition();
- // List<RexNode> projects = rel.projects();
+ RexNode condition = rel.condition();
+ List<RexNode> projects = rel.projects();
InternalIgniteTable tbl = rel.getTable().unwrap(InternalIgniteTable.class);
IgniteTypeFactory typeFactory = ctx.getTypeFactory();
ImmutableBitSet requiredColumns = rel.requiredColumns();
- // List<RexNode> lowerCond = rel.lowerBound();
- // List<RexNode> upperCond = rel.upperBound();
+ List<RexNode> lowerCond = rel.lowerBound();
+ List<RexNode> upperCond = rel.upperBound();
RelDataType rowType = tbl.getRowType(typeFactory, requiredColumns);
- // Predicate<Row> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
- // Supplier<Row> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
- // Supplier<Row> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
- // Function<Row, Row> prj = projects == null ? null : expressionFactory.project(projects, rowType);
- //
- // IgniteIndex idx = tbl.getIndex(rel.indexName());
- //
- // ColocationGroup group = ctx.group(rel.sourceId());
-
- Iterable<RowT> rowsIter = (Iterable<RowT>) List.of(new Object[]{0, 0},
- new Object[]{1, 1}); //idx.scan(ctx, group, filters, lower, upper, prj, requiredColumns);
+ Predicate<RowT> filters = condition == null ? null : expressionFactory.predicate(condition, rowType);
+ Supplier<RowT> lower = lowerCond == null ? null : expressionFactory.rowSource(lowerCond);
+ Supplier<RowT> upper = upperCond == null ? null : expressionFactory.rowSource(upperCond);
+ Function<RowT, RowT> prj = projects == null ? null : expressionFactory.project(projects, rowType);
+
+ IgniteIndex idx = tbl.getIndex(rel.indexName());
+
+ ColocationGroup group = ctx.group(rel.sourceId());
+
+ Iterable<RowT> rowsIter = idx.scan(
+ ctx,
+ group,
+ filters,
+ lower,
+ upper,
+ prj,
+ requiredColumns != null ? requiredColumns : ImmutableBitSet.range(rowType.getFieldCount())
+ );
return new ScanNode<>(ctx, rowType, rowsIter);
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
index ecca66f..f6e3155 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndex.java
@@ -29,6 +29,8 @@ import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -41,6 +43,8 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
protected final Comparator<RowT> comp;
+ protected final RelDataType rowType;
+
private final RelCollation collation;
private final ArrayList<RowT> rows = new ArrayList<>();
@@ -54,11 +58,13 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
*/
public RuntimeSortedIndex(
ExecutionContext<RowT> ectx,
+ RelDataType rowType,
RelCollation collation,
Comparator<RowT> comp
) {
this.ectx = ectx;
this.comp = comp;
+ this.rowType = rowType;
assert Objects.nonNull(collation);
@@ -207,7 +213,15 @@ public class RuntimeSortedIndex<RowT> implements RuntimeIndex<RowT>, TreeIndex<R
/** {@inheritDoc} */
@Override
protected RowT row2indexRow(RowT bound) {
- return bound;
+ RowHandler<RowT> hndl = ectx.rowHandler();
+ RowT rowBound = hndl.factory(Commons.typeFactory(), rowType).create();
+
+ ImmutableIntList keys = collation.getKeys();
+ for (int i = 0; i < keys.size(); ++i) {
+ hndl.set(keys.get(i), rowBound, hndl.get(i, bound));
+ }
+
+ return rowBound;
}
/** {@inheritDoc} */
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
index 2d30895..f5c0a12 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/ddl/DdlCommandHandler.java
@@ -32,6 +32,7 @@ import org.apache.ignite.configuration.NamedListView;
import org.apache.ignite.configuration.schemas.table.ColumnView;
import org.apache.ignite.configuration.schemas.table.PrimaryKeyView;
import org.apache.ignite.configuration.schemas.table.TableChange;
+import org.apache.ignite.internal.idx.IndexManager;
import org.apache.ignite.internal.schema.definition.TableDefinitionImpl;
import org.apache.ignite.internal.sql.engine.prepare.PlanningContext;
import org.apache.ignite.internal.sql.engine.prepare.ddl.AbstractTableDdlCommand;
@@ -54,6 +55,7 @@ import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteStringFormatter;
import org.apache.ignite.lang.IndexAlreadyExistsException;
+import org.apache.ignite.lang.IndexNotFoundException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.schema.SchemaBuilders;
@@ -64,10 +66,13 @@ import org.apache.ignite.schema.definition.builder.SortedIndexDefinitionBuilder.
/** DDL commands handler. */
public class DdlCommandHandler {
- private final TableManager tableManager;
+ private final TableManager tblManager;
- public DdlCommandHandler(TableManager tblManager) {
- tableManager = tblManager;
+ private final IndexManager idxManager;
+
+ public DdlCommandHandler(TableManager tblManager, IndexManager idxManager) {
+ this.tblManager = tblManager;
+ this.idxManager = idxManager;
}
/** Planning context. */
@@ -154,7 +159,7 @@ public class DdlCommandHandler {
);
try {
- tableManager.createTable(fullName, tblChanger);
+ tblManager.createTable(fullName, tblChanger);
} catch (TableAlreadyExistsException ex) {
if (!cmd.ifTableExists()) {
throw ex;
@@ -169,7 +174,7 @@ public class DdlCommandHandler {
IgniteObjectName.quote(cmd.tableName())
);
try {
- tableManager.dropTable(fullName);
+ tblManager.dropTable(fullName);
} catch (TableNotFoundException ex) {
if (!cmd.ifTableExists()) {
throw ex;
@@ -237,33 +242,35 @@ public class DdlCommandHandler {
IgniteObjectName.quote(cmd.tableName())
);
- tableManager.alterTable(fullName, chng -> chng.changeIndices(idxes -> {
- if (idxes.get(cmd.indexName()) != null) {
- if (!cmd.ifIndexNotExists()) {
- throw new IndexAlreadyExistsException(cmd.indexName());
- } else {
- return;
- }
+ try {
+ idxManager.createIndex(cmd.indexName(), fullName, idxCh -> convert(idx.build(), idxCh));
+ } catch (IndexAlreadyExistsException e) {
+ if (!cmd.ifIndexNotExists()) {
+ throw e;
}
-
- idxes.create(cmd.indexName(), tableIndexChange -> convert(idx.build(), tableIndexChange));
- }));
+ }
}
/** Handles drop index command. */
private void handleDropIndex(DropIndexCommand cmd) {
- throw new UnsupportedOperationException("DROP INDEX command not supported for now.");
+ try {
+ idxManager.dropIndex(cmd.indexName());
+ } catch (IndexNotFoundException e) {
+ if (!cmd.ifExist()) {
+ throw e;
+ }
+ }
}
/**
* Adds a column according to the column definition.
*
* @param fullName Table with schema name.
- * @param colsDef Columns defenitions.
+ * @param colsDef Columns definitions.
* @param colNotExist Flag indicates exceptionally behavior in case of already existing column.
*/
private void addColumnInternal(String fullName, List<ColumnDefinition> colsDef, boolean colNotExist) {
- tableManager.alterTable(
+ tblManager.alterTable(
fullName,
chng -> chng.changeColumns(cols -> {
Map<String, String> colNamesToOrders = columnOrdersToNames(chng.columns());
@@ -304,7 +311,7 @@ public class DdlCommandHandler {
* @param colExist Flag indicates exceptionally behavior in case of already existing column.
*/
private void dropColumnInternal(String fullName, Set<String> colNames, boolean colExist) {
- tableManager.alterTable(
+ tblManager.alterTable(
fullName,
chng -> chng.changeColumns(cols -> {
PrimaryKeyView priKey = chng.primaryKey();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
index 7289111..da0ce40 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/IndexSpoolNode.java
@@ -174,7 +174,7 @@ public class IndexSpoolNode<RowT> extends AbstractNode<RowT> implements SingleNo
Supplier<RowT> lowerIdxBound,
Supplier<RowT> upperIdxBound
) {
- RuntimeSortedIndex<RowT> idx = new RuntimeSortedIndex<>(ctx, collation, comp);
+ RuntimeSortedIndex<RowT> idx = new RuntimeSortedIndex<>(ctx, rowType, collation, comp);
ScanNode<RowT> scan = new ScanNode<>(
ctx,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
index 74539d1..ccada61 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/rule/FilterSpoolMergeToSortedIndexSpoolRule.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.sql.engine.rule;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
@@ -77,7 +78,7 @@ public class FilterSpoolMergeToSortedIndexSpoolRule extends RelRule<FilterSpoolM
return;
}
- RelCollation collation = TraitUtils.createCollation(idxCond.keys());
+ RelCollation collation = TraitUtils.createCollation(IntArrayList.of(idxCond.keys().toIntArray()));
RelNode res = new IgniteSortedIndexSpool(
cluster,
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
index cda9d87..e300413 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
@@ -17,7 +17,15 @@
package org.apache.ignite.internal.sql.engine.schema;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.function.Supplier;
import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.util.ImmutableBitSet;
+import org.apache.ignite.internal.idx.InternalSortedIndex;
+import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
+import org.apache.ignite.internal.sql.engine.exec.IndexScan;
+import org.apache.ignite.internal.sql.engine.metadata.ColocationGroup;
/**
* Ignite scannable index.
@@ -27,7 +35,8 @@ public class IgniteIndex {
private final String idxName;
- // private final GridIndex<H2Row> idx;
+ private final InternalSortedIndex idx;
+
private final InternalIgniteTable tbl;
/**
@@ -35,8 +44,25 @@ public class IgniteIndex {
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public IgniteIndex(RelCollation collation, String name, InternalIgniteTable tbl) {
+ this(collation, name, null, tbl);
+ }
+
+ /**
+ * Constructor.
+ * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ */
+ public IgniteIndex(RelCollation collation, InternalSortedIndex idx, InternalIgniteTable tbl) {
+ this(collation, idx.name(), idx, tbl);
+ }
+
+ /**
+ * Constructor.
+ * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ */
+ private IgniteIndex(RelCollation collation, String name, InternalSortedIndex idx, InternalIgniteTable tbl) {
this.collation = collation;
idxName = name;
+ this.idx = idx;
this.tbl = tbl;
}
@@ -51,4 +77,32 @@ public class IgniteIndex {
public InternalIgniteTable table() {
return tbl;
}
+
+ public InternalSortedIndex index() {
+ return idx;
+ }
+
+ /**
+ * Scan index.
+ */
+ public <RowT> Iterable<RowT> scan(
+ ExecutionContext<RowT> ectx,
+ ColocationGroup colocationGrp,
+ Predicate<RowT> filters,
+ Supplier<RowT> lower,
+ Supplier<RowT> upper,
+ Function<RowT, RowT> rowTransformer,
+ ImmutableBitSet requiredColumns
+ ) {
+ return new IndexScan<>(
+ this,
+ ectx,
+ colocationGrp,
+ filters,
+ lower,
+ upper,
+ rowTransformer,
+ requiredColumns
+ );
+ }
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
index 861f6be..0504dd0 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteSchema.java
@@ -73,4 +73,14 @@ public class IgniteSchema extends AbstractSchema {
public void removeTable(String tblName) {
tblMap.remove(tblName);
}
+
+ /**
+ * Return internal table.
+ *
+ * @param name Table's name.
+ * @return Internal table.
+ */
+ public InternalIgniteTable internalTable(String name) {
+ return tblMap.get(name);
+ }
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
index 08d0251..75f5c59 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteTableImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.sql.engine.schema;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
@@ -192,6 +193,15 @@ public class IgniteTableImpl extends AbstractTable implements InternalIgniteTabl
indexes.put(idxTbl.name(), idxTbl);
}
+ /**
+ * Add indexes.
+ *
+ * @param idxs Indexes collection.
+ */
+ public void addIndexes(Collection<IgniteIndex> idxs) {
+ idxs.forEach(idx -> indexes.put(idx.name(), idx));
+ }
+
/** {@inheritDoc} */
@Override
public IgniteIndex getIndex(String idxName) {
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
index ed33d0b..e5bd3a9 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/SqlSchemaManagerImpl.java
@@ -24,11 +24,16 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.RelFieldCollation.Direction;
+import org.apache.calcite.rel.RelFieldCollation.NullDirection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.tools.Frameworks;
+import org.apache.ignite.internal.idx.InternalSortedIndex;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.sql.engine.extension.SqlExtension.ExternalCatalog;
@@ -154,8 +159,17 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
String schemaName,
TableImpl table
) {
+ IgniteTableImpl igniteTable = createTable(schemaName, table);
+
IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
+ schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
+ tablesById.put(igniteTable.id(), igniteTable);
+
+ rebuild();
+ }
+
+ private IgniteTableImpl createTable(String schemaName, TableImpl table) {
SchemaDescriptor descriptor = table.schemaView().schema();
List<ColumnDescriptor> colDescriptors = descriptor.columnNames().stream()
@@ -171,16 +185,11 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
))
.collect(Collectors.toList());
- IgniteTableImpl igniteTable = new IgniteTableImpl(
+ return new IgniteTableImpl(
new TableDescriptorImpl(colDescriptors),
table.internalTable(),
table.schemaView()
);
-
- schema.addTable(removeSchema(schemaName, table.name()), igniteTable);
- tablesById.put(igniteTable.id(), igniteTable);
-
- rebuild();
}
/**
@@ -191,7 +200,22 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
String schemaName,
TableImpl table
) {
- onTableCreated(schemaName, table);
+ IgniteTableImpl igniteTable = createTable(schemaName, table);
+
+ IgniteSchema schema = igniteSchemas.computeIfAbsent(schemaName, IgniteSchema::new);
+
+ String tblName = removeSchema(schemaName, table.name());
+
+ // Rebuild indexes collation.
+ igniteTable.addIndexes(schema.internalTable(tblName).indexes().values().stream()
+ .map(idx -> createIndex(idx.index(), igniteTable))
+ .collect(Collectors.toList())
+ );
+
+ schema.addTable(tblName, igniteTable);
+ tablesById.put(igniteTable.id(), igniteTable);
+
+ rebuild();
}
/**
@@ -213,6 +237,49 @@ public class SqlSchemaManagerImpl implements SqlSchemaManager {
rebuild();
}
+ /**
+ * Build new SQL schema when new index is created.
+ */
+ public void onIndexCreated(String schema, String tblName, InternalSortedIndex idx) {
+ InternalIgniteTable tbl = igniteSchemas.get(schema).internalTable(removeSchema(schema, tblName));
+
+ tbl.addIndex(createIndex(idx, tbl));
+
+ rebuild();
+ }
+
+ /**
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-15480
+ * columns mapping should be masted on column ID instead of column name.
+ */
+ private IgniteIndex createIndex(InternalSortedIndex idx, InternalIgniteTable tbl) {
+ List<RelFieldCollation> idxFieldsCollation = idx.descriptor().columns().stream()
+ .map(c ->
+ new RelFieldCollation(
+ tbl.descriptor().columnDescriptor(c.column().name()).logicalIndex(),
+ c.asc() ? Direction.ASCENDING : Direction.DESCENDING,
+ NullDirection.FIRST
+ )
+ ).collect(Collectors.toList());
+
+ return new IgniteIndex(
+ RelCollations.of(idxFieldsCollation),
+ idx,
+ tbl
+ );
+ }
+
+ /**
+ * Build new SQL schema when existed index is dropped.
+ */
+ public void onIndexDropped(String schema, String tblName, String idxName) {
+ InternalIgniteTable tbl = igniteSchemas.get(schema).internalTable(removeSchema(schema, tblName));
+
+ tbl.removeIndex(idxName);
+
+ rebuild();
+ }
+
private void rebuild() {
SchemaPlus newCalciteSchema = Frameworks.createRootSchema(false);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
index 6a3f816..5ec2a1c 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/trait/TraitUtils.java
@@ -28,7 +28,7 @@ import static org.apache.ignite.internal.util.CollectionUtils.first;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import it.unimi.dsi.fastutil.ints.Int2IntOpenHashMap;
-import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.ints.IntList;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Collections;
@@ -466,7 +466,7 @@ public class TraitUtils {
* @param keys The keys to create collation from.
* @return New collation.
*/
- public static RelCollation createCollation(IntSet keys) {
+ public static RelCollation createCollation(IntList keys) {
return RelCollations.of(
keys.intStream().mapToObj(RelFieldCollation::new).collect(Collectors.toList())
);
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java
index e1f6612..045031b 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/IndexConditions.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.sql.engine.util;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import it.unimi.dsi.fastutil.ints.IntSet;
import it.unimi.dsi.fastutil.ints.IntSets;
import java.util.List;
@@ -40,12 +39,14 @@ public class IndexConditions {
private final List<RexNode> upperBound;
+ private final IntSet keys;
+
/**
* Constructor.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public IndexConditions() {
- this(null, null, null, null);
+ this(null, null, null, null, null);
}
/**
@@ -56,12 +57,14 @@ public class IndexConditions {
@Nullable List<RexNode> lowerCond,
@Nullable List<RexNode> upperCond,
@Nullable List<RexNode> lowerBound,
- @Nullable List<RexNode> upperBound
+ @Nullable List<RexNode> upperBound,
+ @Nullable IntSet keys
) {
this.lowerCond = lowerCond;
this.upperCond = upperCond;
this.lowerBound = lowerBound;
this.upperBound = upperBound;
+ this.keys = keys;
}
/**
@@ -71,6 +74,7 @@ public class IndexConditions {
public IndexConditions(RelInput input) {
lowerCond = null;
upperCond = null;
+ keys = null;
lowerBound = input.get("lower") == null ? null : input.getExpressionList("lower");
upperBound = input.get("upper") == null ? null : input.getExpressionList("upper");
}
@@ -108,22 +112,7 @@ public class IndexConditions {
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
public IntSet keys() {
- if (upperBound == null && lowerBound == null) {
- return IntSets.EMPTY_SET;
- }
-
- IntSet keys = new IntOpenHashSet();
-
- int cols = lowerBound != null ? lowerBound.size() : upperBound.size();
-
- for (int i = 0; i < cols; ++i) {
- if (upperBound != null && RexUtils.isNotNull(upperBound.get(i))
- || lowerBound != null && RexUtils.isNotNull(lowerBound.get(i))) {
- keys.add(i);
- }
- }
-
- return IntSets.unmodifiable(keys);
+ return keys == null ? IntSets.EMPTY_SET : keys;
}
/**
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
index 3fdc23c..b699191 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/util/RexUtils.java
@@ -310,19 +310,25 @@ public class RexUtils {
List<RexNode> lowerBound = null;
List<RexNode> upperBound = null;
+ List<RexNode> allIdxCondition = new ArrayList<>();
+
if (!nullOrEmpty(lower)) {
lowerBound = asBound(cluster, lower, rowType, mapping);
+ allIdxCondition.addAll(lower);
} else {
lower = null;
}
if (!nullOrEmpty(upper)) {
upperBound = asBound(cluster, upper, rowType, mapping);
+ allIdxCondition.addAll(upper);
} else {
upper = null;
}
- return new IndexConditions(lower, upper, lowerBound, upperBound);
+ IntSet keys = indexKeys(allIdxCondition, mapping);
+
+ return new IndexConditions(lower, upper, lowerBound, upperBound, keys);
}
/**
@@ -470,7 +476,8 @@ public class RexUtils {
RexBuilder builder = builder(cluster);
List<RelDataType> types = RelOptUtil.getFieldTypeList(rowType);
- List<RexNode> res = makeListOfNullLiterals(builder, types);
+
+ List<RexNode> res = new ArrayList<>();
for (RexNode pred : idxCond) {
assert pred instanceof RexCall;
@@ -485,13 +492,40 @@ public class RexUtils {
assert index != -1;
- res.set(index, makeCast(builder, cond, types.get(index)));
+ res.add(makeCast(builder, cond, types.get(index)));
}
return res;
}
/**
+ * Index keys used by condition.
+ * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ */
+ public static IntSet indexKeys(Iterable<RexNode> idxCond, @Nullable Mappings.TargetMapping mapping) {
+ if (nullOrEmpty(idxCond)) {
+ return null;
+ }
+
+ IntSet keys = new IntOpenHashSet();
+
+ for (RexNode pred : idxCond) {
+ assert pred instanceof RexCall;
+
+ RexCall call = (RexCall) pred;
+ RexSlot ref = (RexSlot) RexUtil.removeCast(call.operands.get(0));
+
+ int index = mapping == null ? ref.getIndex() : mapping.getSourceOpt(ref.getIndex());
+
+ assert index != -1;
+
+ keys.add(index);
+ }
+
+ return keys;
+ }
+
+ /**
* Permutation.
* TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
*/
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
index 042c025..af66f32 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/StopCalciteModuleTest.java
@@ -35,6 +35,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Flow;
+import org.apache.ignite.internal.idx.IndexManager;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
@@ -82,6 +83,9 @@ public class StopCalciteModuleTest {
TableManager tableManager;
@Mock
+ IndexManager idxManager;
+
+ @Mock
MessagingService msgSrvc;
@Mock
@@ -168,7 +172,7 @@ public class StopCalciteModuleTest {
@Test
public void testStopQueryOnNodeStop() throws Exception {
- SqlQueryProcessor qryProc = new SqlQueryProcessor(clusterSrvc, tableManager);
+ SqlQueryProcessor qryProc = new SqlQueryProcessor(clusterSrvc, tableManager, idxManager);
when(tbl.tableId()).thenReturn(UUID.randomUUID());
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
index d234828..b6c44b4 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/MockedStructuresTest.java
@@ -42,6 +42,8 @@ import org.apache.ignite.internal.baseline.BaselineManager;
import org.apache.ignite.internal.configuration.schema.ExtendedTableConfigurationSchema;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.idx.IndexManager;
+import org.apache.ignite.internal.idx.IndexManagerImpl;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaUtils;
@@ -57,7 +59,6 @@ import org.apache.ignite.lang.ColumnAlreadyExistsException;
import org.apache.ignite.lang.ColumnNotFoundException;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.IgniteInternalException;
-import org.apache.ignite.lang.IndexAlreadyExistsException;
import org.apache.ignite.lang.NodeStoppingException;
import org.apache.ignite.lang.TableAlreadyExistsException;
import org.apache.ignite.lang.TableNotFoundException;
@@ -72,7 +73,6 @@ import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -125,6 +125,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
TableManager tblManager;
+ IndexManager idxManager;
+
SqlQueryProcessor queryProc;
/** Test node. */
@@ -159,7 +161,7 @@ public class MockedStructuresTest extends IgniteAbstractTest {
void before() throws NodeStoppingException {
tblManager = mockManagers();
- queryProc = new SqlQueryProcessor(cs, tblManager);
+ queryProc = new SqlQueryProcessor(cs, tblManager, idxManager);
queryProc.start();
}
@@ -378,48 +380,6 @@ public class MockedStructuresTest extends IgniteAbstractTest {
String.format("ALTER TABLE %s DROP COLUMN c4", curMethodName)));
}
- /**
- * Tests create a table through public API.
- */
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-16032")
- @Test
- public void testCreateDropIndex() {
- SqlQueryProcessor finalQueryProc = queryProc;
-
- String curMethodName = getCurrentMethodName();
-
- String newTblSql = String.format("CREATE TABLE %s (c1 int PRIMARY KEY, c2 varbinary(255)) with partitions=1", curMethodName);
-
- queryProc.query("PUBLIC", newTblSql);
-
- assertTrue(tblManager.tables().stream().anyMatch(t -> t.name()
- .equalsIgnoreCase("PUBLIC." + curMethodName)));
-
- queryProc.query("PUBLIC", String.format("CREATE INDEX index1 ON %s (c1)", curMethodName));
-
- queryProc.query("PUBLIC", String.format("CREATE INDEX IF NOT EXISTS index1 ON %s (c1)", curMethodName));
-
- queryProc.query("PUBLIC", String.format("CREATE INDEX index2 ON %s (c1)", curMethodName));
-
- queryProc.query("PUBLIC", String.format("CREATE INDEX index3 ON %s (c2)", curMethodName));
-
- assertThrows(IndexAlreadyExistsException.class, () ->
- finalQueryProc.query("PUBLIC", String.format("CREATE INDEX index3 ON %s (c1)", curMethodName)));
-
- assertThrows(IgniteException.class, () ->
- finalQueryProc.query("PUBLIC", String.format("CREATE INDEX index_3 ON %s (c1)", curMethodName + "_nonExist")));
-
- queryProc.query("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName));
-
- queryProc.query("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName));
-
- queryProc.query("PUBLIC", String.format("CREATE INDEX index4 ON %s (c2 desc, c1 asc)", curMethodName));
-
- queryProc.query("PUBLIC", String.format("DROP INDEX index4 ON %s", curMethodName));
-
- queryProc.query("PUBLIC", String.format("DROP INDEX IF EXISTS index4 ON %s", curMethodName));
- }
-
// todo copy-paste from TableManagerTest will be removed after https://issues.apache.org/jira/browse/IGNITE-16050
/**
* Instantiates a table and prepares Table manager.
@@ -490,6 +450,8 @@ public class MockedStructuresTest extends IgniteAbstractTest {
tm
);
+ idxManager = new IndexManagerImpl(tableManager, mock(TablesConfiguration.class));
+
tableManager.start();
return tableManager;
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
index 07dbbaf..57901c8 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/RuntimeSortedIndexTest.java
@@ -118,6 +118,7 @@ public class RuntimeSortedIndexTest extends IgniteAbstractTest {
ArrayRowHandler.INSTANCE,
ImmutableMap.of()
),
+ rowType,
RelCollations.of(ImmutableIntList.copyOf(idxCols)),
(o1, o2) -> {
for (int colIdx : idxCols) {
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
index bf27768..351071f 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/CorrelatedNestedLoopJoinPlannerTest.java
@@ -26,7 +26,6 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.rel.IgniteIndexScan;
@@ -100,23 +99,21 @@ public class CorrelatedNestedLoopJoinPlannerTest extends AbstractPlannerTest {
IgniteIndexScan idxScan = findFirstNode(phys, byClass(IgniteIndexScan.class));
+ assertTrue(idxScan.indexConditions().keys().contains(1));
+
List<RexNode> lowerBound = idxScan.lowerBound();
assertNotNull(lowerBound, "Invalid plan\n" + RelOptUtil.toString(phys));
- assertEquals(3, lowerBound.size());
+ assertEquals(1, lowerBound.size());
- assertTrue(((RexLiteral) lowerBound.get(0)).isNull());
- assertTrue(((RexLiteral) lowerBound.get(2)).isNull());
- assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
+ assertTrue(lowerBound.get(0) instanceof RexFieldAccess);
List<RexNode> upperBound = idxScan.upperBound();
assertNotNull(upperBound, "Invalid plan\n" + RelOptUtil.toString(phys));
- assertEquals(3, upperBound.size());
+ assertEquals(1, upperBound.size());
- assertTrue(((RexLiteral) upperBound.get(0)).isNull());
- assertTrue(((RexLiteral) upperBound.get(2)).isNull());
- assertTrue(upperBound.get(1) instanceof RexFieldAccess);
+ assertTrue(upperBound.get(0) instanceof RexFieldAccess);
}
/**
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
index 526fef1..68961c9 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/HashIndexSpoolPlannerTest.java
@@ -26,7 +26,6 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.rel.IgniteHashIndexSpool;
@@ -103,11 +102,9 @@ public class HashIndexSpoolPlannerTest extends AbstractPlannerTest {
List<RexNode> searchRow = idxSpool.searchRow();
assertNotNull(searchRow);
- assertEquals(3, searchRow.size());
+ assertEquals(1, searchRow.size());
- assertTrue(((RexLiteral) searchRow.get(0)).isNull());
- assertTrue(((RexLiteral) searchRow.get(2)).isNull());
- assertTrue(searchRow.get(1) instanceof RexFieldAccess);
+ assertTrue(searchRow.get(0) instanceof RexFieldAccess);
}
@Test
@@ -165,12 +162,10 @@ public class HashIndexSpoolPlannerTest extends AbstractPlannerTest {
List<RexNode> searcRow = idxSpool.searchRow();
assertNotNull(searcRow);
- assertEquals(4, searcRow.size());
+ assertEquals(2, searcRow.size());
- assertTrue(((RexLiteral) searcRow.get(0)).isNull());
+ assertTrue(searcRow.get(0) instanceof RexFieldAccess);
assertTrue(searcRow.get(1) instanceof RexFieldAccess);
- assertTrue(searcRow.get(2) instanceof RexFieldAccess);
- assertTrue(((RexLiteral) searcRow.get(3)).isNull());
}
/**
@@ -228,10 +223,8 @@ public class HashIndexSpoolPlannerTest extends AbstractPlannerTest {
List<RexNode> searchRow = idxSpool.searchRow();
assertNotNull(searchRow);
- assertEquals(3, searchRow.size());
+ assertEquals(1, searchRow.size());
- assertTrue(((RexLiteral) searchRow.get(0)).isNull());
- assertTrue(((RexLiteral) searchRow.get(2)).isNull());
- assertTrue(searchRow.get(1) instanceof RexFieldAccess);
+ assertTrue(searchRow.get(0) instanceof RexFieldAccess);
}
}
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
index 621b44c..ebd9a98 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/PlannerTest.java
@@ -98,7 +98,7 @@ public class PlannerTest extends AbstractPlannerTest {
.build()) {
@Override
public IgniteIndex getIndex(String idxName) {
- return new IgniteIndex(null, null, null);
+ return new IgniteIndex(null, (String) null, null);
}
@Override
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
index bbb9e9d..9e6ce34 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/planner/SortedIndexSpoolPlannerTest.java
@@ -26,7 +26,6 @@ import org.apache.calcite.plan.RelOptUtil;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelDataTypeFactory;
import org.apache.calcite.rex.RexFieldAccess;
-import org.apache.calcite.rex.RexLiteral;
import org.apache.calcite.rex.RexNode;
import org.apache.calcite.util.ImmutableIntList;
import org.apache.ignite.internal.sql.engine.rel.IgniteRel;
@@ -98,23 +97,23 @@ public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
IgniteSortedIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteSortedIndexSpool.class));
+ assertNotNull(idxSpool, "Invalid plan: " + RelOptUtil.toString(phys));
+
+ assertTrue(idxSpool.indexCondition().keys().contains(1));
+
List<RexNode> lowerBound = idxSpool.indexCondition().lowerBound();
assertNotNull(lowerBound);
- assertEquals(3, lowerBound.size());
+ assertEquals(1, lowerBound.size());
- assertTrue(((RexLiteral) lowerBound.get(0)).isNull());
- assertTrue(((RexLiteral) lowerBound.get(2)).isNull());
- assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
+ assertTrue(lowerBound.get(0) instanceof RexFieldAccess);
List<RexNode> upperBound = idxSpool.indexCondition().upperBound();
assertNotNull(upperBound);
- assertEquals(3, upperBound.size());
+ assertEquals(1, upperBound.size());
- assertTrue(((RexLiteral) upperBound.get(0)).isNull());
- assertTrue(((RexLiteral) upperBound.get(2)).isNull());
- assertTrue(upperBound.get(1) instanceof RexFieldAccess);
+ assertTrue(upperBound.get(0) instanceof RexFieldAccess);
}
/**
@@ -175,24 +174,20 @@ public class SortedIndexSpoolPlannerTest extends AbstractPlannerTest {
IgniteSortedIndexSpool idxSpool = findFirstNode(phys, byClass(IgniteSortedIndexSpool.class));
+ assertTrue(idxSpool.indexCondition().keys().contains(1));
+
List<RexNode> lowerBound = idxSpool.indexCondition().lowerBound();
assertNotNull(lowerBound);
- assertEquals(4, lowerBound.size());
+ assertEquals(1, lowerBound.size());
- assertTrue(((RexLiteral) lowerBound.get(0)).isNull());
- assertTrue(((RexLiteral) lowerBound.get(2)).isNull());
- assertTrue(((RexLiteral) lowerBound.get(3)).isNull());
- assertTrue(lowerBound.get(1) instanceof RexFieldAccess);
+ assertTrue(lowerBound.get(0) instanceof RexFieldAccess);
List<RexNode> upperBound = idxSpool.indexCondition().upperBound();
assertNotNull(upperBound);
- assertEquals(4, upperBound.size());
+ assertEquals(1, upperBound.size());
- assertTrue(((RexLiteral) upperBound.get(0)).isNull());
- assertTrue(((RexLiteral) lowerBound.get(2)).isNull());
- assertTrue(((RexLiteral) lowerBound.get(3)).isNull());
- assertTrue(upperBound.get(1) instanceof RexFieldAccess);
+ assertTrue(upperBound.get(0) instanceof RexFieldAccess);
}
}
diff --git a/modules/storage-api/pom.xml b/modules/storage-api/pom.xml
index 2543ac8..2bb717b 100644
--- a/modules/storage-api/pom.xml
+++ b/modules/storage-api/pom.xml
@@ -43,6 +43,11 @@
<artifactId>ignite-schema</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-index-api</artifactId>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.apache.ignite</groupId>
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageRowListener.java
similarity index 51%
copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageRowListener.java
index cda9d87..6ae62ba 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/StorageRowListener.java
@@ -15,40 +15,39 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.schema;
+package org.apache.ignite.internal.storage.engine;
-import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
/**
- * Ignite scannable index.
+ * Listen storage changes.
*/
-public class IgniteIndex {
- private final RelCollation collation;
-
- private final String idxName;
-
- // private final GridIndex<H2Row> idx;
- private final InternalIgniteTable tbl;
+public interface StorageRowListener {
+ StorageRowListener NO_OP = new StorageRowListener() {
+ @Override
+ public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow) {
+ // No-op.
+ }
+
+ @Override
+ public void onRemove(BinaryRow row) {
+ // No-op.
+ }
+ };
/**
- * Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Called when row is updated.
+ *
+ * @param oldRow Old row.
+ * @param newRow New row.
*/
- public IgniteIndex(RelCollation collation, String name, InternalIgniteTable tbl) {
- this.collation = collation;
- idxName = name;
- this.tbl = tbl;
- }
-
- public RelCollation collation() {
- return collation;
- }
+ void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow);
- public String name() {
- return idxName;
- }
-
- public InternalIgniteTable table() {
- return tbl;
- }
+ /**
+ * Called when row is removed.
+ *
+ * @param row Removed row.
+ */
+ void onRemove(BinaryRow row);
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
index 549c34e..43bdf33 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/engine/TableStorage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.storage.engine;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
@@ -62,12 +63,12 @@ public interface TableStorage {
* <p>A prerequisite for calling this method is to have the index already configured under the same name in the Table Configuration
* (see {@link #configuration()}).
*
- * @param indexName Index name.
+ * @param desc Index descriptor.
* @return Sorted Index storage.
* @throws StorageException if no index has been configured under the given name or it has been configured incorrectly (e.g. it was
* configured as a Hash Index).
*/
- SortedIndexStorage getOrCreateSortedIndex(String indexName);
+ SortedIndexStorage createSortedIndex(SortedIndexDescriptor desc);
/**
* Destroys the index under the given name and all data in it.
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java
index 5213d9c..cb158cc 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRow.java
@@ -17,26 +17,34 @@
package org.apache.ignite.internal.storage.index;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.PartitionStorage;
-import org.apache.ignite.internal.storage.SearchRow;
/**
* Represents an Index Row - a set of indexed columns and Primary Key columns (for key uniqueness).
*/
public interface IndexRow {
/**
- * Returns the serialized presentation of this row as a byte array.
+ * Returns the Primary Key that is a part of this row.
+ *
+ * <p>This is a convenience method for easier extraction of the Primary Key to use it for accessing the {@link PartitionStorage}.
*
- * @return Serialized byte array value.
+ * @return Primary key of the associated {@link PartitionStorage}.
*/
- byte[] rowBytes();
+ BinaryRow primaryKey();
/**
- * Returns the Primary Key that is a part of this row.
+ * Returns the partition number corresponds to PK.
*
- * <p>This is a convenience method for easier extraction of the Primary Key to use it for accessing the {@link PartitionStorage}.
+ * @return Partition number corresponds to PK.
+ */
+ int partition();
+
+ /**
+ * Returns value of the indexed columns specified by index column order.
*
- * @return Primary key of the associated {@link PartitionStorage}.
+ * @param idxColOrder Index column order.
+ * @return Value of the indexed columns.
*/
- SearchRow primaryKey();
+ Object value(int idxColOrder);
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowPrefix.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowPrefix.java
index 602cfaf..c041bf3 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowPrefix.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowPrefix.java
@@ -18,12 +18,21 @@
package org.apache.ignite.internal.storage.index;
/**
- * Represents an Index row prefix, used to perform prefix scans over the Sorted Index storage.
+ * Represents an Index Row - a set of indexed columns and Primary Key columns (for key uniqueness).
*/
public interface IndexRowPrefix {
/**
- * Returns a list of column values that comprise a prefix of an Index row. Values will be sorted in the same order as the
- * Sorted Index columns, specified by {@link SortedIndexDescriptor#indexRowColumns()}.
+ * Returns value of the indexed columns specified by index column order.
+ *
+ * @param idxColOrder Index column order.
+ * @return Value of the indexed columns.
*/
- Object[] prefixColumnValues();
+ Object value(int idxColOrder);
+
+ /**
+ * Returns prefix length.
+ *
+ * @return prefix length.
+ */
+ int length();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexSchemaDescriptor.java
similarity index 57%
copy from modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
copy to modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexSchemaDescriptor.java
index 9d7ca85..bc4763e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/manager/Event.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexSchemaDescriptor.java
@@ -15,12 +15,25 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.manager;
+package org.apache.ignite.internal.storage.index;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
/**
- * The event cas which is produced by event producer component.
- *
- * @see Producer#fireEvent(Event, EventParameters, Throwable)
+ * Index column collation: direction and NULLs order that a column's value is ordered in.
*/
-public interface Event {
+public class IndexSchemaDescriptor extends SchemaDescriptor {
+ private static final Column[] EMPTY_COLUMNS_ARRAY = new Column[0];
+
+ public IndexSchemaDescriptor(Column[] cols) {
+ super(0, cols, EMPTY_COLUMNS_ARRAY);
+ }
+
+ /**
+ * Returns index columns.
+ */
+ public Column[] columns() {
+ return keyColumns().columns();
+ }
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
deleted file mode 100644
index ac25744..0000000
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexDescriptor.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.index;
-
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toUnmodifiableList;
-
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.stream.Stream;
-import org.apache.ignite.configuration.NamedListView;
-import org.apache.ignite.configuration.schemas.table.ColumnView;
-import org.apache.ignite.configuration.schemas.table.IndexColumnView;
-import org.apache.ignite.configuration.schemas.table.SortedIndexView;
-import org.apache.ignite.configuration.schemas.table.TableIndexView;
-import org.apache.ignite.configuration.schemas.table.TableView;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter;
-import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
-import org.apache.ignite.internal.storage.StorageException;
-import org.apache.ignite.internal.tostring.S;
-import org.apache.ignite.schema.definition.ColumnDefinition;
-
-/**
- * Descriptor for creating a Sorted Index Storage.
- *
- * @see SortedIndexStorage
- */
-public class SortedIndexDescriptor {
- /**
- * Descriptor of a Sorted Index column (column name and column sort order).
- */
- public static class ColumnDescriptor {
- private final Column column;
-
- private final boolean asc;
-
- private final boolean indexedColumn;
-
- private final boolean nullable;
-
- ColumnDescriptor(Column column, boolean asc, boolean indexedColumn, boolean nullable) {
- this.column = column;
- this.asc = asc;
- this.indexedColumn = indexedColumn;
- this.nullable = nullable;
- }
-
- /**
- * Returns a column descriptor.
- */
- public Column column() {
- return column;
- }
-
- /**
- * Returns {@code true} if this column is sorted in ascending order or {@code false} otherwise.
- */
- public boolean asc() {
- return asc;
- }
-
- /**
- * Returns {@code true} if this column was explicitly marked as an indexed column or {@code false} if it is a part of a Primary Key
- * appended for uniqueness.
- */
- public boolean indexedColumn() {
- return indexedColumn;
- }
-
- /**
- * Returns {@code true} if this column can contain null values or {@code false} otherwise.
- */
- public boolean nullable() {
- return nullable;
- }
-
- @Override
- public String toString() {
- return S.toString(this);
- }
- }
-
- private final String name;
-
- private final List<ColumnDescriptor> columns;
-
- private final SchemaDescriptor schemaDescriptor;
-
- /**
- * Creates an Index Descriptor from a given Table Configuration.
- *
- * @param name index name.
- * @param tableConfig table configuration.
- */
- public SortedIndexDescriptor(String name, TableView tableConfig) {
- this.name = name;
-
- TableIndexView indexConfig = tableConfig.indices().get(name);
-
- if (indexConfig == null) {
- throw new StorageException(String.format("Index configuration for \"%s\" could not be found", name));
- }
-
- if (!(indexConfig instanceof SortedIndexView)) {
- throw new StorageException(String.format(
- "Index \"%s\" is not configured as a Sorted Index. Actual type: %s",
- name, indexConfig.type()
- ));
- }
-
- // extract indexed column configurations from the table configuration
- NamedListView<? extends IndexColumnView> indexColumns = ((SortedIndexView) indexConfig).columns();
-
- Stream<String> indexColumnNames = indexColumns.namedListKeys().stream();
-
- // append the primary key to guarantee index key uniqueness
- Stream<String> primaryKeyColumnNames = Arrays.stream(tableConfig.primaryKey().columns());
-
- List<ColumnView> indexKeyColumnViews = Stream.concat(indexColumnNames, primaryKeyColumnNames)
- .distinct() // remove Primary Key columns if they are already present in the index definition
- .map(columnName -> {
- ColumnView columnView = tableConfig.columns().get(columnName);
-
- assert columnView != null : "Incorrect index column configuration. " + columnName + " column does not exist";
-
- return columnView;
- })
- .collect(toList());
-
- schemaDescriptor = createSchemaDescriptor(indexKeyColumnViews);
-
- columns = Arrays.stream(schemaDescriptor.keyColumns().columns())
- .sorted(Comparator.comparingInt(Column::columnOrder))
- .map(column -> {
- IndexColumnView columnView = indexColumns.get(column.name());
-
- // if the index config does not contain this column - it's a column from the Primary Key
- boolean indexedColumn = columnView != null;
-
- // PK columns are always sorted in ascending order
- boolean asc = !indexedColumn || columnView.asc();
-
- return new ColumnDescriptor(column, asc, indexedColumn, column.nullable());
- })
- .collect(toUnmodifiableList());
- }
-
- /**
- * Creates a {@link SchemaDescriptor} from a list of index key columns.
- */
- private static SchemaDescriptor createSchemaDescriptor(List<ColumnView> indexKeyColumnViews) {
- Column[] keyColumns = new Column[indexKeyColumnViews.size()];
-
- for (int i = 0; i < indexKeyColumnViews.size(); ++i) {
- ColumnView columnView = indexKeyColumnViews.get(i);
-
- ColumnDefinition columnDefinition = SchemaConfigurationConverter.convert(columnView);
-
- keyColumns[i] = SchemaDescriptorConverter.convert(i, columnDefinition);
- }
-
- return new SchemaDescriptor(0, keyColumns, new Column[0]);
- }
-
- /**
- * Returns this index' name.
- */
- public String name() {
- return name;
- }
-
- /**
- * Returns the Column Descriptors that comprise a row of this index (indexed columns + primary key columns).
- */
- public List<ColumnDescriptor> indexRowColumns() {
- return columns;
- }
-
- /**
- * Converts this Descriptor into an equivalent {@link SchemaDescriptor}.
- *
- * <p>The resulting {@code SchemaDescriptor} will have empty {@link SchemaDescriptor#valueColumns()} and its
- * {@link SchemaDescriptor#keyColumns()} will be consistent with the columns returned by {@link #indexRowColumns()}.
- */
- public SchemaDescriptor asSchemaDescriptor() {
- return schemaDescriptor;
- }
-}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
index bcbffe2..bbbe9a8 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
+++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/SortedIndexStorage.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.storage.index;
+import java.util.function.Predicate;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.SearchRow;
import org.apache.ignite.internal.util.Cursor;
@@ -35,16 +37,6 @@ public interface SortedIndexStorage extends AutoCloseable {
SortedIndexDescriptor indexDescriptor();
/**
- * Returns a factory for creating index rows for this storage.
- */
- IndexRowFactory indexRowFactory();
-
- /**
- * Returns a class deserializing index columns.
- */
- IndexRowDeserializer indexRowDeserializer();
-
- /**
* Adds the given index key and {@link SearchRow} to the index.
*
* <p>Putting a new value under the same key will overwrite the previous associated value.
@@ -60,9 +52,12 @@ public interface SortedIndexStorage extends AutoCloseable {
/**
* Returns a range of index values between the lower bound (inclusive) and the upper bound (inclusive).
+ *
+ * @param low Lower bound of the range to scan.
+ * @param up Upper bound of the range to scan.
+ * @param filter Row filter. Basically it is used to filter by partition number.
*/
- // TODO: add options https://issues.apache.org/jira/browse/IGNITE-16059
- Cursor<IndexRow> range(IndexRowPrefix lowerBound, IndexRowPrefix upperBound);
+ Cursor<IndexRow> range(IndexRowPrefix low, IndexRowPrefix up, Predicate<IndexRow> filter);
/**
* Removes all data in this index and frees the associated resources.
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
index 094e416..bc33a2a 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorage.java
@@ -20,7 +20,6 @@ package org.apache.ignite.internal.storage.rocksdb;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.PARTITION_CF_NAME;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.columnFamilyType;
import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexCfName;
-import static org.apache.ignite.internal.storage.rocksdb.ColumnFamilyUtils.sortedIndexName;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -35,15 +34,16 @@ import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.engine.DataRegion;
import org.apache.ignite.internal.storage.engine.TableStorage;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.index.BinaryRowComparator;
import org.apache.ignite.internal.storage.rocksdb.index.RocksDbSortedIndexStorage;
+import org.apache.ignite.internal.storage.rocksdb.index.SortedIndexStorageDescriptor;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.NotNull;
@@ -172,11 +172,10 @@ class RocksDbTableStorage implements TableStorage {
break;
case SORTED_INDEX:
- String indexName = sortedIndexName(handleName);
-
- var indexDescriptor = new SortedIndexDescriptor(indexName, tableCfg.value());
-
- sortedIndices.put(indexName, new RocksDbSortedIndexStorage(cf, indexDescriptor));
+ // TODO: restore index on restart.
+ //String indexName = sortedIndexName(handleName);
+ //var indexDescriptor = new MySortedIndexDescriptor(indexName, tableCfg.value());
+ //sortedIndices.put(indexName, new RocksDbSortedIndexStorage(cf, indexDescriptor));
break;
@@ -289,17 +288,16 @@ class RocksDbTableStorage implements TableStorage {
}
@Override
- public SortedIndexStorage getOrCreateSortedIndex(String indexName) {
+ public SortedIndexStorage createSortedIndex(SortedIndexDescriptor desc) {
assert !stopped : "Storage has been stopped";
- return sortedIndices.computeIfAbsent(indexName, name -> {
- var indexDescriptor = new SortedIndexDescriptor(name, tableCfg.value());
-
- ColumnFamilyDescriptor cfDescriptor = sortedIndexCfDescriptor(indexDescriptor);
+ SortedIndexStorageDescriptor idxStoreDesc = new SortedIndexStorageDescriptor(desc);
+ return sortedIndices.computeIfAbsent(desc.name(), name -> {
+ ColumnFamilyDescriptor cfDescriptor = sortedIndexCfDescriptor(idxStoreDesc);
ColumnFamily cf = createColumnFamily(sortedIndexCfName(name), cfDescriptor);
- return new RocksDbSortedIndexStorage(cf, indexDescriptor);
+ return new RocksDbSortedIndexStorage(cf, idxStoreDesc);
});
}
@@ -392,9 +390,13 @@ class RocksDbTableStorage implements TableStorage {
return new ColumnFamilyDescriptor(cfName.getBytes(StandardCharsets.UTF_8), new ColumnFamilyOptions());
case SORTED_INDEX:
- var indexDescriptor = new SortedIndexDescriptor(sortedIndexName(cfName), tableCfg.value());
+ // TODO: resore index on restart
+ // var indexDescriptor = new SortedIndexDescriptor(sortedIndexName(cfName), tableCfg.value());
+ // return sortedIndexCfDescriptor(indexDescriptor);
+
+ assert false : "Restore index not supported";
- return sortedIndexCfDescriptor(indexDescriptor);
+ return null;
default:
throw new StorageException("Unidentified column family [name=" + cfName + ", table=" + tableCfg.name() + ']');
@@ -411,7 +413,7 @@ class RocksDbTableStorage implements TableStorage {
/**
* Creates a Column Family descriptor for a Sorted Index.
*/
- private static ColumnFamilyDescriptor sortedIndexCfDescriptor(SortedIndexDescriptor descriptor) {
+ private static ColumnFamilyDescriptor sortedIndexCfDescriptor(SortedIndexStorageDescriptor descriptor) {
String cfName = sortedIndexCfName(descriptor.name());
ColumnFamilyOptions options = new ColumnFamilyOptions().setComparator(new BinaryRowComparator(descriptor));
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowDeserializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowDeserializer.java
deleted file mode 100644
index 9cba543..0000000
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowDeserializer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.rocksdb.index;
-
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.storage.index.IndexRow;
-import org.apache.ignite.internal.storage.index.IndexRowDeserializer;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
-
-/**
- * {@link IndexRowDeserializer} implementation that uses {@link BinaryRow} infrastructure for deserialization purposes.
- */
-class BinaryIndexRowDeserializer implements IndexRowDeserializer {
- private final SortedIndexDescriptor descriptor;
-
- BinaryIndexRowDeserializer(SortedIndexDescriptor descriptor) {
- this.descriptor = descriptor;
- }
-
- @Override
- public Object[] indexedColumnValues(IndexRow indexRow) {
- var row = new Row(descriptor.asSchemaDescriptor(), new ByteBufferRow(indexRow.rowBytes()));
-
- return descriptor.indexRowColumns().stream()
- .filter(ColumnDescriptor::indexedColumn)
- .map(ColumnDescriptor::column)
- .map(column -> column.type().spec().objectValue(row, column.schemaIndex()))
- .toArray();
- }
-}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowFactory.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowFactory.java
deleted file mode 100644
index d8a09d6..0000000
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowFactory.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.rocksdb.index;
-
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.row.RowAssembler;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.index.IndexRow;
-import org.apache.ignite.internal.storage.index.IndexRowFactory;
-import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-
-/**
- * {@link IndexRowFactory} implementation that uses {@link BinaryRow} as the index keys serialization mechanism.
- */
-class BinaryIndexRowFactory implements IndexRowFactory {
- private final SortedIndexDescriptor descriptor;
-
- BinaryIndexRowFactory(SortedIndexDescriptor descriptor) {
- this.descriptor = descriptor;
- }
-
- @Override
- public IndexRow createIndexRow(Object[] columnValues, SearchRow primaryKey) {
- if (columnValues.length != descriptor.indexRowColumns().size()) {
- throw new IllegalArgumentException(String.format(
- "Incorrect number of column values passed. Expected %d, got %d",
- descriptor.indexRowColumns().size(),
- columnValues.length
- ));
- }
-
- RowAssembler rowAssembler = createRowAssembler(columnValues);
-
- for (Column column : descriptor.asSchemaDescriptor().keyColumns().columns()) {
- Object columnValue = columnValues[column.columnOrder()];
-
- RowAssembler.writeValue(rowAssembler, column, columnValue);
- }
-
- return new BinaryIndexRow(rowAssembler.build(), primaryKey);
- }
-
- @Override
- public IndexRowPrefix createIndexRowPrefix(Object[] prefixColumnValues) {
- if (prefixColumnValues.length > descriptor.indexRowColumns().size()) {
- throw new IllegalArgumentException(String.format(
- "Incorrect number of column values passed. Expected not more than %d, got %d",
- descriptor.indexRowColumns().size(),
- prefixColumnValues.length
- ));
- }
-
- return () -> prefixColumnValues;
- }
-
- /**
- * Creates a {@link RowAssembler} that can later be used to serialized the given column mapping.
- */
- private RowAssembler createRowAssembler(Object[] rowColumns) {
- SchemaDescriptor schemaDescriptor = descriptor.asSchemaDescriptor();
-
- int nonNullVarlenKeyCols = 0;
-
- for (Column column : schemaDescriptor.keyColumns().columns()) {
- Object columnValue = rowColumns[column.columnOrder()];
-
- if (!column.type().spec().fixedLength() && columnValue != null) {
- nonNullVarlenKeyCols += 1;
- }
- }
-
- return new RowAssembler(schemaDescriptor, nonNullVarlenKeyCols, 0);
- }
-}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowSerializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowSerializer.java
new file mode 100644
index 0000000..437faf3
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRowSerializer.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.storage.index.IndexRow;
+
+/**
+ * {@link IndexRowDeserializer} implementation that uses {@link BinaryRow} infrastructure for deserialization purposes.
+ */
+class BinaryIndexRowSerializer implements IndexRowSerializer, IndexRowDeserializer {
+ private final SortedIndexStorageDescriptor descriptor;
+
+ BinaryIndexRowSerializer(SortedIndexStorageDescriptor descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ @Override
+ public IndexRow deserialize(IndexBinaryRow binRow) {
+ return new IndexRowImpl(binRow.keySlice(), binRow.valueSlice(), descriptor);
+ }
+
+ @Override
+ public IndexBinaryRow serialize(IndexRow row) {
+ RowAssembler rowAssembler = createRowAssembler(row);
+
+ for (Column column : descriptor.schema().keyColumns().columns()) {
+ Object columnValue = row.value(column.columnOrder());
+
+ RowAssembler.writeValue(rowAssembler, column, columnValue);
+ }
+
+ return new IndexBinaryRowImpl(rowAssembler.build(), row.primaryKey());
+ }
+
+ /**
+ * Creates a {@link RowAssembler} that can later be used to serialized the given column mapping.
+ */
+ private RowAssembler createRowAssembler(IndexRow row) {
+ SchemaDescriptor schemaDescriptor = descriptor.schema();
+
+ int nonNullVarlenKeyCols = 0;
+
+ for (Column column : schemaDescriptor.keyColumns().columns()) {
+ Object columnValue = row.value(column.columnOrder());
+
+ if (!column.type().spec().fixedLength() && columnValue != null) {
+ nonNullVarlenKeyCols += 1;
+ }
+ }
+
+ return new RowAssembler(schemaDescriptor, nonNullVarlenKeyCols, 0);
+ }
+}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java
index ad47a27..8c526f2 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryRowComparator.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypeSpec;
import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.rocksdb.AbstractComparator;
import org.rocksdb.ComparatorOptions;
@@ -53,14 +52,14 @@ public class BinaryRowComparator extends AbstractComparator {
/**
* Creates a RocksDB comparator for a Sorted Index identified by the given descriptor.
*/
- public BinaryRowComparator(SortedIndexDescriptor descriptor) {
+ public BinaryRowComparator(SortedIndexStorageDescriptor descriptor) {
this(descriptor, new ComparatorOptions());
}
/**
* Internal constructor for capturing the {@code options} parameter for resource management purposes.
*/
- private BinaryRowComparator(SortedIndexDescriptor descriptor, ComparatorOptions options) {
+ private BinaryRowComparator(SortedIndexStorageDescriptor descriptor, ComparatorOptions options) {
super(options);
innerComparator = comparing(
@@ -74,9 +73,9 @@ public class BinaryRowComparator extends AbstractComparator {
/**
* Creates a comparator for comparing two {@link BinaryRow}s by converting them into {@link Row}s.
*/
- private static Comparator<BinaryRow> binaryRowComparator(SortedIndexDescriptor descriptor) {
+ private static Comparator<BinaryRow> binaryRowComparator(SortedIndexStorageDescriptor descriptor) {
return comparing(
- binaryRow -> new Row(descriptor.asSchemaDescriptor(), binaryRow),
+ binaryRow -> new Row(descriptor.schema(), binaryRow),
rowComparator(descriptor)
);
}
@@ -84,8 +83,8 @@ public class BinaryRowComparator extends AbstractComparator {
/**
* Creates a comparator that compares two {@link Row}s by comparing individual columns.
*/
- private static Comparator<Row> rowComparator(SortedIndexDescriptor descriptor) {
- return descriptor.indexRowColumns().stream()
+ private static Comparator<Row> rowComparator(SortedIndexStorageDescriptor descriptor) {
+ return descriptor.indexColumns().stream()
.map(columnDescriptor -> {
Column column = columnDescriptor.column();
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRow.java
similarity index 68%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
copy to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRow.java
index cf860d6..80d5aac 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRow.java
@@ -15,17 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.index;
+package org.apache.ignite.internal.storage.rocksdb.index;
/**
- * Class for extracting indexed column values from an {@link IndexRow}.
+ * Represents an Index Row - a set of indexed columns and Primary Key columns (for key uniqueness).
*/
-public interface IndexRowDeserializer {
+public interface IndexBinaryRow {
/**
- * De-serializes column values that were used to create the index.
- *
- * @param indexRow Index row.
- * @return Values of the indexed columns.
+ * Get ByteBuffer slice representing the key chunk.
*/
- Object[] indexedColumnValues(IndexRow indexRow);
+ byte[] keySlice();
+
+ /**
+ * Get ByteBuffer slice representing the value chunk.
+ */
+ byte[] valueSlice();
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRow.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRowImpl.java
similarity index 68%
rename from modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRow.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRowImpl.java
index 9f1531b..d902810 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/BinaryIndexRow.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexBinaryRowImpl.java
@@ -19,37 +19,33 @@ package org.apache.ignite.internal.storage.rocksdb.index;
import java.util.Arrays;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
/**
- * {@link IndexRow} implementation that uses {@link BinaryRow} serialization.
+ * Implementation of the {@link IndexBinaryRow}.
*/
-class BinaryIndexRow implements IndexRow {
+class IndexBinaryRowImpl implements IndexBinaryRow {
private final byte[] bytes;
- private final SearchRow pk;
+ private final BinaryRow pk;
- BinaryIndexRow(byte[] bytes, byte[] pkBytes) {
+ /**
+ * Constructor.
+ */
+ IndexBinaryRowImpl(byte[] bytes, byte[] pkBytes) {
this.bytes = bytes;
- this.pk = new ByteArraySearchRow(pkBytes);
+ this.pk = new ByteBufferRow(pkBytes);
}
- BinaryIndexRow(BinaryRow row, SearchRow primaryKey) {
+ /**
+ * Constructor.
+ */
+ IndexBinaryRowImpl(BinaryRow row, BinaryRow pk) {
this.bytes = row.bytes();
- this.pk = primaryKey;
- }
-
- @Override
- public byte[] rowBytes() {
- return bytes;
- }
-
- @Override
- public SearchRow primaryKey() {
- return pk;
+ this.pk = pk;
}
+ /** {@inheritDoc} */
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -58,12 +54,25 @@ class BinaryIndexRow implements IndexRow {
if (o == null || getClass() != o.getClass()) {
return false;
}
- BinaryIndexRow that = (BinaryIndexRow) o;
+ IndexBinaryRowImpl that = (IndexBinaryRowImpl) o;
return Arrays.equals(bytes, that.bytes);
}
+ /** {@inheritDoc} */
@Override
public int hashCode() {
return Arrays.hashCode(bytes);
}
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] keySlice() {
+ return bytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] valueSlice() {
+ return pk.bytes();
+ }
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowDeserializer.java
similarity index 85%
copy from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
copy to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowDeserializer.java
index cf860d6..afbbe9f 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowDeserializer.java
@@ -15,10 +15,12 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.index;
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.storage.index.IndexRow;
/**
- * Class for extracting indexed column values from an {@link IndexRow}.
+ * Class for extracting indexed column values from an {@link IndexBinaryRow}.
*/
public interface IndexRowDeserializer {
/**
@@ -27,5 +29,5 @@ public interface IndexRowDeserializer {
* @param indexRow Index row.
* @return Values of the indexed columns.
*/
- Object[] indexedColumnValues(IndexRow indexRow);
+ IndexRow deserialize(IndexBinaryRow indexRow);
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowImpl.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowImpl.java
new file mode 100644
index 0000000..0ed0d8e
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.storage.index.IndexRow;
+
+/**
+ * {@link IndexRow} implementation.
+ */
+class IndexRowImpl implements IndexRow {
+ private final Row keyRow;
+
+ private final byte[] valBytes;
+
+ private final SortedIndexStorageDescriptor desc;
+
+ /**
+ * Creates index row by index data schema over the bytes.
+ *
+ * @param desc Index descriptor.
+ */
+ IndexRowImpl(byte[] keyBytes, byte[] valBytes, SortedIndexStorageDescriptor desc) {
+ this.desc = desc;
+
+ keyRow = new Row(desc.schema(), new ByteBufferRow(keyBytes));
+ this.valBytes = valBytes;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Object value(int idxColOrder) {
+ Column c = desc.indexColumns().get(idxColOrder).column();
+
+ switch (c.type().spec()) {
+ case INT8:
+ return keyRow.byteValueBoxed(c.schemaIndex());
+
+ case INT16:
+ return keyRow.shortValueBoxed(c.schemaIndex());
+
+ case INT32:
+ return keyRow.intValueBoxed(c.schemaIndex());
+
+ case INT64:
+ return keyRow.longValueBoxed(c.schemaIndex());
+
+ case FLOAT:
+ return keyRow.floatValueBoxed(c.schemaIndex());
+
+ case DOUBLE:
+ return keyRow.doubleValueBoxed(c.schemaIndex());
+
+ case DECIMAL:
+ return keyRow.decimalValue(c.schemaIndex());
+
+ case UUID:
+ return keyRow.uuidValue(c.schemaIndex());
+
+ case STRING:
+ return keyRow.stringValue(c.schemaIndex());
+
+ case BYTES:
+ return keyRow.bytesValue(c.schemaIndex());
+
+ case BITMASK:
+ return keyRow.bitmaskValue(c.schemaIndex());
+
+ case NUMBER:
+ return keyRow.numberValue(c.schemaIndex());
+
+ case DATE:
+ return keyRow.dateValue(c.schemaIndex());
+
+ case TIME:
+ return keyRow.timeValue(c.schemaIndex());
+
+ case DATETIME:
+ return keyRow.dateTimeValue(c.schemaIndex());
+
+ case TIMESTAMP:
+ return keyRow.timestampValue(c.schemaIndex());
+
+ default:
+ throw new IllegalStateException("Unexpected value: " + c.type().spec());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BinaryRow primaryKey() {
+ return new ByteBufferRow(valBytes);
+ }
+
+ @Override
+ public int partition() {
+ return 0;
+ }
+}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowSerializer.java
similarity index 79%
rename from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
rename to modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowSerializer.java
index cf860d6..9d00c59 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowDeserializer.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowSerializer.java
@@ -15,17 +15,19 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.index;
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import org.apache.ignite.internal.storage.index.IndexRow;
/**
- * Class for extracting indexed column values from an {@link IndexRow}.
+ * Class for extracting indexed column values from an {@link IndexBinaryRow}.
*/
-public interface IndexRowDeserializer {
+public interface IndexRowSerializer {
/**
- * De-serializes column values that were used to create the index.
+ * Serializes index row.
*
* @param indexRow Index row.
* @return Values of the indexed columns.
*/
- Object[] indexedColumnValues(IndexRow indexRow);
+ IndexBinaryRow serialize(IndexRow indexRow);
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
index 2001d77..414fdfb 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/PrefixComparator.java
@@ -19,21 +19,20 @@ package org.apache.ignite.internal.storage.rocksdb.index;
import java.util.Arrays;
import java.util.BitSet;
+import org.apache.ignite.internal.idx.SortedIndexColumnDescriptor;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypeSpec;
import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
import org.jetbrains.annotations.Nullable;
/**
* Class for comparing a {@link BinaryRow} representing an Index Key with a given prefix of index columns.
*/
class PrefixComparator {
- private final SortedIndexDescriptor descriptor;
- private final @Nullable Object[] prefix;
+ private final SortedIndexStorageDescriptor descriptor;
+ private final @Nullable IndexRowPrefix prefix;
/**
* Creates a new prefix comparator.
@@ -41,11 +40,11 @@ class PrefixComparator {
* @param descriptor Index Descriptor of the enclosing index.
* @param prefix Prefix to compare the incoming rows against.
*/
- PrefixComparator(SortedIndexDescriptor descriptor, IndexRowPrefix prefix) {
- assert descriptor.indexRowColumns().size() >= prefix.prefixColumnValues().length;
+ PrefixComparator(SortedIndexStorageDescriptor descriptor, IndexRowPrefix prefix) {
+ assert descriptor.columns().size() >= prefix.length();
this.descriptor = descriptor;
- this.prefix = prefix.prefixColumnValues();
+ this.prefix = prefix;
}
/**
@@ -57,12 +56,12 @@ class PrefixComparator {
* a value greater than {@code 0} if the row's prefix is larger than the prefix.
*/
int compare(BinaryRow binaryRow) {
- var row = new Row(descriptor.asSchemaDescriptor(), binaryRow);
+ var row = new Row(descriptor.schema(), binaryRow);
- for (int i = 0; i < prefix.length; ++i) {
- ColumnDescriptor columnDescriptor = descriptor.indexRowColumns().get(i);
+ for (int i = 0; i < prefix.length(); ++i) {
+ SortedIndexColumnDescriptor columnDescriptor = descriptor.indexColumns().get(i);
- int compare = compare(columnDescriptor.column(), row, prefix[i]);
+ int compare = compare(columnDescriptor.column(), row, prefix.value(i));
if (compare != 0) {
return columnDescriptor.asc() ? compare : -compare;
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
index 6d7b6e5..b55ea8e 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorage.java
@@ -17,15 +17,13 @@
package org.apache.ignite.internal.storage.rocksdb.index;
+import java.util.function.Predicate;
import org.apache.ignite.internal.rocksdb.ColumnFamily;
import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter;
import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.IndexRow;
-import org.apache.ignite.internal.storage.index.IndexRowDeserializer;
-import org.apache.ignite.internal.storage.index.IndexRowFactory;
import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -38,72 +36,67 @@ import org.rocksdb.RocksIterator;
public class RocksDbSortedIndexStorage implements SortedIndexStorage {
private final ColumnFamily indexCf;
- private final SortedIndexDescriptor descriptor;
-
- private final IndexRowFactory indexRowFactory;
+ private final SortedIndexStorageDescriptor descriptor;
private final IndexRowDeserializer indexRowDeserializer;
+ private final IndexRowSerializer indexRowSerializer;
+
/**
* Creates a new Index storage.
*
* @param indexCf Column Family for storing the data.
* @param descriptor Index descriptor.
*/
- public RocksDbSortedIndexStorage(ColumnFamily indexCf, SortedIndexDescriptor descriptor) {
+ public RocksDbSortedIndexStorage(ColumnFamily indexCf, SortedIndexStorageDescriptor descriptor) {
this.indexCf = indexCf;
this.descriptor = descriptor;
- this.indexRowFactory = new BinaryIndexRowFactory(descriptor);
- this.indexRowDeserializer = new BinaryIndexRowDeserializer(descriptor);
- }
- @Override
- public SortedIndexDescriptor indexDescriptor() {
- return descriptor;
+ BinaryIndexRowSerializer serializer = new BinaryIndexRowSerializer(descriptor);
+ this.indexRowSerializer = serializer;
+ this.indexRowDeserializer = serializer;
}
@Override
- public IndexRowFactory indexRowFactory() {
- return indexRowFactory;
- }
-
- @Override
- public IndexRowDeserializer indexRowDeserializer() {
- return indexRowDeserializer;
+ public SortedIndexStorageDescriptor indexDescriptor() {
+ return descriptor;
}
@Override
public void put(IndexRow row) {
- assert row.rowBytes().length > 0;
- assert row.primaryKey().keyBytes().length > 0;
+ assert row.primaryKey().bytes().length > 0;
try {
- indexCf.put(row.rowBytes(), row.primaryKey().keyBytes());
+ IndexBinaryRow binRow = indexRowSerializer.serialize(row);
+
+ indexCf.put(binRow.keySlice(), binRow.valueSlice());
} catch (RocksDBException e) {
throw new StorageException("Error while adding data to Rocks DB", e);
}
}
@Override
- public void remove(IndexRow key) {
+ public void remove(IndexRow row) {
try {
- indexCf.delete(key.rowBytes());
+ IndexBinaryRow binRow = indexRowSerializer.serialize(row);
+
+ indexCf.delete(binRow.keySlice());
} catch (RocksDBException e) {
throw new StorageException("Error while removing data from Rocks DB", e);
}
}
@Override
- public Cursor<IndexRow> range(IndexRowPrefix lowerBound, IndexRowPrefix upperBound) {
+ public Cursor<IndexRow> range(IndexRowPrefix low, IndexRowPrefix up, Predicate<IndexRow> filter) {
RocksIterator iter = indexCf.newIterator();
iter.seekToFirst();
return new RocksIteratorAdapter<>(iter) {
@Nullable
- private PrefixComparator lowerBoundComparator = new PrefixComparator(descriptor, lowerBound);
+ private PrefixComparator lowerBoundComparator = low != null ? new PrefixComparator(descriptor, low) : null;
- private final PrefixComparator upperBoundComparator = new PrefixComparator(descriptor, upperBound);
+ private final PrefixComparator upperBoundComparator = up != null ? new PrefixComparator(descriptor, up) : null;
@Override
public boolean hasNext() {
@@ -122,7 +115,7 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
}
}
- return upperBoundComparator.compare(row) <= 0;
+ return upperBoundComparator == null || upperBoundComparator.compare(row) <= 0;
}
return false;
@@ -130,7 +123,7 @@ public class RocksDbSortedIndexStorage implements SortedIndexStorage {
@Override
protected IndexRow decodeEntry(byte[] key, byte[] value) {
- return new BinaryIndexRow(key, value);
+ return indexRowDeserializer.deserialize(new IndexBinaryRowImpl(key, value));
}
};
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/SortedIndexStorageDescriptor.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/SortedIndexStorageDescriptor.java
new file mode 100644
index 0000000..35b42bd
--- /dev/null
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/index/SortedIndexStorageDescriptor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.rocksdb.index;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.idx.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.storage.index.IndexSchemaDescriptor;
+
+/**
+ * Descriptor for creating a Sorted Index Storage.
+ * TODO: IGNITE-16105 Replace sorted index binary storage protocol
+ */
+public class SortedIndexStorageDescriptor extends SortedIndexDescriptor {
+ private final List<SortedIndexColumnDescriptor> idxColumns;
+
+ private final IndexSchemaDescriptor idxSchema;
+
+ /**
+ * Creates an Index Descriptor from a given Table Configuration.
+ */
+ public SortedIndexStorageDescriptor(SortedIndexDescriptor desc) {
+ super(desc.name(), desc.columns(), desc.pkColumns());
+
+ this.idxSchema = new IndexSchemaDescriptor(
+ IntStream.range(0, columns().size())
+ .mapToObj(i -> columns().get(i).column().copyWithOrder(i))
+ .toArray(Column[]::new)
+ );
+
+ idxColumns = Arrays.asList(idxSchema.columns()).stream()
+ .sorted(Comparator.comparing(Column::columnOrder))
+ .map(c -> {
+ SortedIndexColumnDescriptor origDesc = columns().stream()
+ .filter(origCol -> origCol.column().name().equals(c.name()))
+ .findAny()
+ .get();
+
+ return new SortedIndexColumnDescriptor(c, origDesc.collation());
+ })
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Converts this Descriptor into an equivalent {@link SchemaDescriptor}.
+ * TODO: IGNITE-16105 Replace sorted index binary storage protocol
+ *
+ * <p>The resulting {@code SchemaDescriptor} will have empty {@link SchemaDescriptor#valueColumns()} and its
+ * {@link SchemaDescriptor#keyColumns()} will be consistent with the columns returned by {@link #columns()}.
+ */
+ public SchemaDescriptor schema() {
+ return idxSchema;
+ }
+
+ /**
+ * Index columns with {@link Column#schemaIndex()} specified for index storage schema.
+ * TODO: IGNITE-16105 Replace sorted index binary storage protocol
+ */
+ public List<SortedIndexColumnDescriptor> indexColumns() {
+ return idxColumns;
+ }
+}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowWrapper.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowWrapper.java
index 7539ea0..779aec1 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowWrapper.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/IndexRowWrapper.java
@@ -30,12 +30,13 @@ import java.util.Objects;
import java.util.Random;
import java.util.function.Function;
import java.util.stream.IntStream;
+import org.apache.ignite.internal.idx.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.SchemaTestUtils;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
+import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.NotNull;
/**
@@ -45,16 +46,16 @@ class IndexRowWrapper implements Comparable<IndexRowWrapper> {
/**
* Values used to create the Index row.
*/
- private final Object[] columns;
+ private final Tuple row;
- private final IndexRow row;
+ private final IndexRow idxRow;
- private final SortedIndexDescriptor descriptor;
+ private final SortedIndexStorageDescriptor desc;
- IndexRowWrapper(SortedIndexStorage storage, IndexRow row, Object[] columns) {
- this.descriptor = storage.indexDescriptor();
+ IndexRowWrapper(SortedIndexStorage storage, IndexRow idxRow, Tuple row) {
+ this.desc = (SortedIndexStorageDescriptor) storage.indexDescriptor();
+ this.idxRow = idxRow;
this.row = row;
- this.columns = columns;
}
/**
@@ -63,48 +64,49 @@ class IndexRowWrapper implements Comparable<IndexRowWrapper> {
static IndexRowWrapper randomRow(SortedIndexStorage indexStorage) {
var random = new Random();
- Object[] columns = indexStorage.indexDescriptor().indexRowColumns().stream()
- .map(ColumnDescriptor::column)
- .map(column -> generateRandomValue(random, column.type()))
- .toArray();
+ Tuple row = Tuple.create();
- var primaryKey = new ByteArraySearchRow(randomBytes(random, 25));
+ ((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().stream()
+ .map(SortedIndexColumnDescriptor::column)
+ .forEach(column -> row.set(column.name(), generateRandomValue(random, column.type())));
- IndexRow row = indexStorage.indexRowFactory().createIndexRow(columns, primaryKey);
+ var primaryKey = new ByteBufferRow(randomBytes(random, 25));
- return new IndexRowWrapper(indexStorage, row, columns);
+ return new IndexRowWrapper(indexStorage, new TestIndexRow(row, primaryKey, 0), row);
}
/**
* Creates an Index Key prefix of the given length.
*/
IndexRowPrefix prefix(int length) {
- return () -> Arrays.copyOf(columns, length);
- }
-
- IndexRow row() {
- return row;
- }
+ return new IndexRowPrefix() {
+ @Override
+ public Object value(int idxColOrder) {
+ return idxRow.value(idxColOrder);
+ }
- Object[] columns() {
- return columns;
+ @Override
+ public int length() {
+ return length;
+ }
+ };
}
@Override
public int compareTo(@NotNull IndexRowWrapper o) {
- int sizeCompare = Integer.compare(columns.length, o.columns.length);
+ int sizeCompare = Integer.compare(row.columnCount(), o.row.columnCount());
if (sizeCompare != 0) {
return sizeCompare;
}
- for (int i = 0; i < columns.length; ++i) {
- Comparator<Object> comparator = comparator(columns[i].getClass());
+ for (int i = 0; i < row.columnCount(); ++i) {
+ Comparator<Object> comparator = comparator(row.value(i).getClass());
- int compare = comparator.compare(columns[i], o.columns[i]);
+ int compare = comparator.compare(row.value(i), o.row.value(i));
if (compare != 0) {
- boolean asc = descriptor.indexRowColumns().get(i).asc();
+ boolean asc = desc.indexColumns().get(i).asc();
return asc ? compare : -compare;
}
@@ -122,12 +124,12 @@ class IndexRowWrapper implements Comparable<IndexRowWrapper> {
return false;
}
IndexRowWrapper that = (IndexRowWrapper) o;
- return row.equals(that.row);
+ return idxRow.equals(that.idxRow);
}
@Override
public int hashCode() {
- return Objects.hash(row);
+ return Objects.hash(idxRow);
}
/**
@@ -154,4 +156,11 @@ class IndexRowWrapper implements Comparable<IndexRowWrapper> {
.map(Comparable.class::cast)
.toArray(Comparable[]::new);
}
+
+ /**
+ * Returns index row.
+ */
+ public IndexRow row() {
+ return idxRow;
+ }
}
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
index 768e189..7095438 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
@@ -17,26 +17,20 @@
package org.apache.ignite.internal.storage.rocksdb.index;
-import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
import static org.apache.ignite.internal.schema.SchemaTestUtils.generateRandomValue;
import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static org.apache.ignite.schema.SchemaBuilders.column;
import static org.apache.ignite.schema.SchemaBuilders.tableBuilder;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anyOf;
-import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.sameInstance;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import java.nio.file.Path;
import java.util.ArrayList;
@@ -45,6 +39,7 @@ import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
@@ -55,13 +50,16 @@ import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSch
import org.apache.ignite.configuration.schemas.table.TableConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
-import org.apache.ignite.internal.storage.SearchRow;
-import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.idx.SortedIndexColumnDescriptor;
+import org.apache.ignite.internal.idx.SortedIndexDescriptor;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.configuration.SchemaDescriptorConverter;
import org.apache.ignite.internal.storage.engine.DataRegion;
import org.apache.ignite.internal.storage.engine.TableStorage;
import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexRowPrefix;
-import org.apache.ignite.internal.storage.index.SortedIndexDescriptor.ColumnDescriptor;
import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.testframework.VariableSource;
@@ -70,15 +68,10 @@ import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.lang.IgniteLogger;
-import org.apache.ignite.schema.SchemaBuilders;
import org.apache.ignite.schema.definition.ColumnDefinition;
import org.apache.ignite.schema.definition.ColumnType;
import org.apache.ignite.schema.definition.TableDefinition;
-import org.apache.ignite.schema.definition.builder.SortedIndexDefinitionBuilder;
-import org.apache.ignite.schema.definition.builder.SortedIndexDefinitionBuilder.SortedIndexColumnBuilder;
-import org.apache.ignite.schema.definition.index.ColumnarIndexDefinition;
-import org.apache.ignite.schema.definition.index.HashIndexDefinition;
-import org.apache.ignite.schema.definition.index.SortedIndexDefinition;
+import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -99,24 +92,72 @@ public class RocksDbSortedIndexStorageTest {
* Definitions of all supported column types.
*/
private static final List<ColumnDefinition> ALL_TYPES_COLUMN_DEFINITIONS = allTypesColumnDefinitions();
-
+ /**
+ * List of resources that need to be closed at the end of each test.
+ */
+ private final List<AutoCloseable> resources = new ArrayList<>();
private Random random;
-
@InjectConfiguration(polymorphicExtensions = {
HashIndexConfigurationSchema.class,
SortedIndexConfigurationSchema.class
})
private TableConfiguration tableCfg;
-
/**
* Table Storage for creating indices.
*/
private TableStorage tableStorage;
+ private static List<ColumnDefinition> allTypesColumnDefinitions() {
+ Stream<ColumnType> allColumnTypes = Stream.of(
+ ColumnType.INT8,
+ ColumnType.INT16,
+ ColumnType.INT32,
+ ColumnType.INT64,
+ ColumnType.FLOAT,
+ ColumnType.DOUBLE,
+ ColumnType.UUID,
+ ColumnType.DATE,
+ ColumnType.bitmaskOf(32),
+ ColumnType.string(),
+ ColumnType.blobOf(),
+ ColumnType.numberOf(),
+ ColumnType.decimalOf(),
+ ColumnType.time(),
+ ColumnType.datetime(),
+ ColumnType.timestamp()
+ );
+
+ return allColumnTypes
+ .map(type -> column(type.typeSpec().name(), type).asNullable(true).build())
+ .collect(toUnmodifiableList());
+ }
+
/**
- * List of resources that need to be closed at the end of each test.
+ * Extracts all data from a given cursor and closes it.
*/
- private final List<AutoCloseable> resources = new ArrayList<>();
+ private static <T> List<T> cursorToList(Cursor<T> cursor) throws Exception {
+ try (cursor) {
+ var list = new ArrayList<T>();
+
+ cursor.forEachRemaining(list::add);
+
+ return list;
+ }
+ }
+
+ /**
+ * Extracts a single value by a given key or {@code null} if it does not exist.
+ */
+ @Nullable
+ private static IndexRow getSingle(SortedIndexStorage indexStorage, IndexRowWrapper entry) throws Exception {
+ IndexRowPrefix fullPrefix = entry.prefix(((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().size());
+
+ List<IndexRow> values = cursorToList(indexStorage.range(fullPrefix, fullPrefix, r -> true));
+
+ assertThat(values, anyOf(empty(), hasSize(1)));
+
+ return values.isEmpty() ? null : values.get(0);
+ }
@BeforeEach
void setUp(
@@ -174,31 +215,6 @@ public class RocksDbSortedIndexStorageTest {
assertThat(createTableFuture, willBe(nullValue(Void.class)));
}
- private static List<ColumnDefinition> allTypesColumnDefinitions() {
- Stream<ColumnType> allColumnTypes = Stream.of(
- ColumnType.INT8,
- ColumnType.INT16,
- ColumnType.INT32,
- ColumnType.INT64,
- ColumnType.FLOAT,
- ColumnType.DOUBLE,
- ColumnType.UUID,
- ColumnType.DATE,
- ColumnType.bitmaskOf(32),
- ColumnType.string(),
- ColumnType.blobOf(),
- ColumnType.numberOf(),
- ColumnType.decimalOf(),
- ColumnType.time(),
- ColumnType.datetime(),
- ColumnType.timestamp()
- );
-
- return allColumnTypes
- .map(type -> column(type.typeSpec().name(), type).asNullable(true).build())
- .collect(toUnmodifiableList());
- }
-
@AfterEach
void tearDown() throws Exception {
Collections.reverse(resources);
@@ -213,16 +229,21 @@ public class RocksDbSortedIndexStorageTest {
void testRowSerialization() {
SortedIndexStorage indexStorage = createIndex(ALL_TYPES_COLUMN_DEFINITIONS);
- Object[] columns = indexStorage.indexDescriptor().indexRowColumns().stream()
- .map(ColumnDescriptor::column)
- .map(column -> generateRandomValue(random, column.type()))
- .toArray();
+ Tuple tuple = Tuple.create();
+ ((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().stream()
+ .sequential()
+ .map(SortedIndexColumnDescriptor::column)
+ .forEach(column -> tuple.set(column.name(), generateRandomValue(random, column.type())));
+
+ BinaryIndexRowSerializer ser = new BinaryIndexRowSerializer((SortedIndexStorageDescriptor) indexStorage.indexDescriptor());
- IndexRow row = indexStorage.indexRowFactory().createIndexRow(columns, new ByteArraySearchRow(new byte[0]));
+ IndexBinaryRow binRow = ser.serialize(new TestIndexRow(tuple, new ByteBufferRow(new byte[]{(byte) 0}), 0));
- Object[] actual = indexStorage.indexRowDeserializer().indexedColumnValues(row);
+ IndexRow idxRow = ser.deserialize(binRow);
- assertThat(actual, is(equalTo(columns)));
+ for (int i = 0; i < ((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().size(); ++i) {
+ assertThat(tuple.value(i), is(equalTo(idxRow.value(i))));
+ }
}
/**
@@ -260,7 +281,7 @@ public class RocksDbSortedIndexStorageTest {
return entry;
})
.sorted()
- .collect(toList());
+ .collect(Collectors.toList());
int firstIndex = 3;
int lastIndex = 8;
@@ -268,17 +289,17 @@ public class RocksDbSortedIndexStorageTest {
List<byte[]> expected = entries.stream()
.skip(firstIndex)
.limit(lastIndex - firstIndex + 1)
- .map(e -> e.row().primaryKey().keyBytes())
- .collect(toList());
+ .map(e -> e.row().primaryKey().bytes())
+ .collect(Collectors.toList());
IndexRowPrefix first = entries.get(firstIndex).prefix(3);
IndexRowPrefix last = entries.get(lastIndex).prefix(5);
- List<byte[]> actual = cursorToList(indexStorage.range(first, last))
+ List<byte[]> actual = cursorToList(indexStorage.range(first, last, r -> true))
.stream()
.map(IndexRow::primaryKey)
- .map(SearchRow::keyBytes)
- .collect(toList());
+ .map(BinaryRow::bytes)
+ .collect(Collectors.toList());
assertThat(actual, hasSize(lastIndex - firstIndex + 1));
@@ -308,85 +329,11 @@ public class RocksDbSortedIndexStorageTest {
indexStorage.put(entry1.row());
indexStorage.put(entry2.row());
- List<IndexRow> actual = cursorToList(indexStorage.range(entry2::columns, entry1::columns));
-
- assertThat(actual, is(empty()));
- }
-
- /**
- * Tests creating a index that has not been created through the Configuration framework.
- */
- @Test
- void testCreateMissingIndex() {
- StorageException ex = assertThrows(StorageException.class, () -> tableStorage.getOrCreateSortedIndex("does not exist"));
-
- assertThat(ex.getMessage(), is(equalTo("Index configuration for \"does not exist\" could not be found")));
- }
-
- /**
- * Tests creating a Sorted Index that has been misconfigured as a Hash Index.
- */
- @Test
- void testCreateMisconfiguredIndex() {
- HashIndexDefinition definition = SchemaBuilders.hashIndex("wrong type")
- .withColumns("foo")
- .build();
-
- StorageException ex = assertThrows(StorageException.class, () -> createIndex(definition));
-
- assertThat(ex.getMessage(), is(equalTo("Index \"WRONG TYPE\" is not configured as a Sorted Index. Actual type: HASH")));
- }
-
- /**
- * Tests the {@link TableStorage#dropIndex} functionality.
- */
- @Test
- void testDropIndex() throws Exception {
- SortedIndexStorage storage = createIndex(ALL_TYPES_COLUMN_DEFINITIONS.subList(0, 1));
-
- String indexName = storage.indexDescriptor().name();
-
- assertThat(tableStorage.getOrCreateSortedIndex(indexName), is(sameInstance(storage)));
-
- IndexRowWrapper entry = IndexRowWrapper.randomRow(storage);
-
- storage.put(entry.row());
-
- tableStorage.dropIndex(indexName);
-
- SortedIndexStorage nextStorage = tableStorage.getOrCreateSortedIndex(indexName);
-
- assertThat(nextStorage, is(not(sameInstance(storage))));
- assertThat(getSingle(nextStorage, entry), is(nullValue()));
- }
-
- @ParameterizedTest
- @VariableSource("ALL_TYPES_COLUMN_DEFINITIONS")
- void testNullValues(ColumnDefinition columnDefinition) throws Exception {
- SortedIndexStorage storage = createIndex(List.of(columnDefinition));
-
- IndexRowWrapper entry1 = IndexRowWrapper.randomRow(storage);
-
- Object[] nullArray = storage.indexDescriptor().indexRowColumns().stream()
- .map(columnDescriptor -> columnDescriptor.indexedColumn() ? null : (byte) random.nextInt())
- .toArray();
-
- IndexRow nullRow = storage.indexRowFactory().createIndexRow(nullArray, new ByteArraySearchRow(randomBytes(random, 10)));
-
- IndexRowWrapper entry2 = new IndexRowWrapper(storage, nullRow, nullArray);
-
- storage.put(entry1.row());
- storage.put(entry2.row());
+ int colCount = ((SortedIndexStorageDescriptor) indexStorage.indexDescriptor()).indexColumns().size();
- if (entry1.compareTo(entry2) > 0) {
- IndexRowWrapper t = entry2;
- entry2 = entry1;
- entry1 = t;
- }
-
- List<IndexRow> rows = cursorToList(storage.range(entry1::columns, entry2::columns));
+ List<IndexRow> actual = cursorToList(indexStorage.range(entry2.prefix(colCount), entry1.prefix(colCount), r -> true));
- assertThat(rows, contains(entry1.row(), entry2.row()));
+ assertThat(actual, is(empty()));
}
private List<ColumnDefinition> shuffledRandomDefinitions() {
@@ -400,12 +347,12 @@ public class RocksDbSortedIndexStorageTest {
private List<ColumnDefinition> shuffledDefinitions(Predicate<ColumnDefinition> filter) {
List<ColumnDefinition> shuffledDefinitions = ALL_TYPES_COLUMN_DEFINITIONS.stream()
.filter(filter)
- .collect(toList());
+ .collect(Collectors.toList());
Collections.shuffle(shuffledDefinitions, random);
if (log.isInfoEnabled()) {
- List<String> columnNames = shuffledDefinitions.stream().map(ColumnDefinition::name).collect(toList());
+ List<String> columnNames = shuffledDefinitions.stream().map(ColumnDefinition::name).collect(Collectors.toList());
log.info("Creating index with the following column order: " + columnNames);
}
@@ -432,13 +379,13 @@ public class RocksDbSortedIndexStorageTest {
indexStorage.put(entry2.row());
assertThat(
- getSingle(indexStorage, entry1).primaryKey().keyBytes(),
- is(equalTo(entry1.row().primaryKey().keyBytes()))
+ getSingle(indexStorage, entry1).primaryKey().bytes(),
+ is(equalTo(entry1.row().primaryKey().bytes()))
);
assertThat(
- getSingle(indexStorage, entry2).primaryKey().keyBytes(),
- is(equalTo(entry2.row().primaryKey().keyBytes()))
+ getSingle(indexStorage, entry2).primaryKey().bytes(),
+ is(equalTo(entry2.row().primaryKey().bytes()))
);
indexStorage.remove(entry1.row());
@@ -450,62 +397,17 @@ public class RocksDbSortedIndexStorageTest {
* Creates a Sorted Index using the given columns.
*/
private SortedIndexStorage createIndex(List<ColumnDefinition> indexSchema) {
- SortedIndexDefinitionBuilder indexDefinitionBuilder = SchemaBuilders.sortedIndex("foo");
-
- indexSchema.forEach(column -> {
- SortedIndexColumnBuilder columnBuilder = indexDefinitionBuilder.addIndexColumn(column.name());
-
- if (random.nextBoolean()) {
- columnBuilder.asc();
- } else {
- columnBuilder.desc();
- }
-
- columnBuilder.done();
- });
-
- SortedIndexDefinition indexDefinition = indexDefinitionBuilder.build();
-
- return createIndex(indexDefinition);
- }
-
- /**
- * Creates a Sorted Index using the given index definition.
- */
- private SortedIndexStorage createIndex(ColumnarIndexDefinition indexDefinition) {
- CompletableFuture<Void> createIndexFuture = tableCfg.change(cfg ->
- cfg.changeIndices(idxList ->
- idxList.create(indexDefinition.name(), idx -> convert(indexDefinition, idx))));
-
- assertThat(createIndexFuture, willBe(nullValue(Void.class)));
-
- return tableStorage.getOrCreateSortedIndex(indexDefinition.name());
- }
-
- /**
- * Extracts all data from a given cursor and closes it.
- */
- private static <T> List<T> cursorToList(Cursor<T> cursor) throws Exception {
- try (cursor) {
- var list = new ArrayList<T>();
+ List<SortedIndexColumnDescriptor> cols = new ArrayList<>();
- cursor.forEachRemaining(list::add);
+ for (int i = 0; i < indexSchema.size(); ++i) {
+ ColumnDefinition colDef = indexSchema.get(i);
- return list;
+ Column col = new Column(colDef.name(), SchemaDescriptorConverter.convert(colDef.type()), true);
+ cols.add(new SortedIndexColumnDescriptor(col, random.nextBoolean()));
}
- }
-
- /**
- * Extracts a single value by a given key or {@code null} if it does not exist.
- */
- @Nullable
- private static IndexRow getSingle(SortedIndexStorage indexStorage, IndexRowWrapper entry) throws Exception {
- IndexRowPrefix fullPrefix = entry::columns;
- List<IndexRow> values = cursorToList(indexStorage.range(fullPrefix, fullPrefix));
-
- assertThat(values, anyOf(empty(), hasSize(1)));
-
- return values.isEmpty() ? null : values.get(0);
+ return tableStorage.createSortedIndex(
+ new SortedIndexDescriptor("foo", cols, new Column[] {cols.get(0).column()})
+ );
}
}
diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowFactory.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/TestIndexRow.java
similarity index 50%
rename from modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowFactory.java
rename to modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/TestIndexRow.java
index bbb1396..8e98cdb 100644
--- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/index/IndexRowFactory.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/TestIndexRow.java
@@ -15,22 +15,45 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage.index;
+package org.apache.ignite.internal.storage.rocksdb.index;
-import org.apache.ignite.internal.storage.SearchRow;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.index.IndexRow;
+import org.apache.ignite.table.Tuple;
/**
- * Temporary API for creating Index rows from a list of column values. All columns must be sorted according to the index columns order,
- * specified by the {@link SortedIndexDescriptor#indexRowColumns()}.
+ * Index row for tests.
*/
-public interface IndexRowFactory {
- /**
- * Creates an Index row from a list of column values.
- */
- IndexRow createIndexRow(Object[] columnValues, SearchRow primaryKey);
+public class TestIndexRow implements IndexRow {
+ private final Tuple tuple;
+
+ private final BinaryRow pk;
+
+ private final int part;
/**
- * Creates an Prefix row from a list of column values.
+ * Constructor.
*/
- IndexRowPrefix createIndexRowPrefix(Object[] prefixColumnValues);
+ public TestIndexRow(Tuple t, BinaryRow pk, int part) {
+ this.tuple = t;
+ this.pk = pk;
+ this.part = part;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public BinaryRow primaryKey() {
+ return pk;
+ }
+
+ @Override
+ public int partition() {
+ return part;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Object value(int idxColOrder) {
+ return tuple.value(idxColOrder);
+ }
}
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java b/modules/table/src/main/java/org/apache/ignite/internal/table/StorageRowListener.java
similarity index 50%
copy from modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/StorageRowListener.java
index cda9d87..5e26181 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/schema/IgniteIndex.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/StorageRowListener.java
@@ -15,40 +15,39 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.sql.engine.schema;
+package org.apache.ignite.internal.table;
-import org.apache.calcite.rel.RelCollation;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
/**
- * Ignite scannable index.
+ * Listen storage changes.
*/
-public class IgniteIndex {
- private final RelCollation collation;
-
- private final String idxName;
-
- // private final GridIndex<H2Row> idx;
- private final InternalIgniteTable tbl;
+public interface StorageRowListener {
+ StorageRowListener NO_OP = new StorageRowListener() {
+ @Override
+ public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId) {
+ // No-op.
+ }
+
+ @Override
+ public void onRemove(BinaryRow row, int partId) {
+ // No-op.
+ }
+ };
/**
- * Constructor.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Called when row is updated.
+ *
+ * @param oldRow Old row.
+ * @param newRow New row.
*/
- public IgniteIndex(RelCollation collation, String name, InternalIgniteTable tbl) {
- this.collation = collation;
- idxName = name;
- this.tbl = tbl;
- }
-
- public RelCollation collation() {
- return collation;
- }
+ void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId);
- public String name() {
- return idxName;
- }
-
- public InternalIgniteTable table() {
- return tbl;
- }
+ /**
+ * Called when row is removed.
+ *
+ * @param row Removed row.
+ */
+ void onRemove(BinaryRow row, int partId);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 3687f78..9788386 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -17,8 +17,12 @@
package org.apache.ignite.internal.table;
+import java.util.Collections;
import java.util.Objects;
+import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerException;
import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
@@ -30,18 +34,21 @@ import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
/**
* Table view implementation for binary objects.
*/
-public class TableImpl implements Table {
+public class TableImpl implements Table, StorageRowListener {
/** Internal table. */
private final InternalTable tbl;
/** Schema registry. */
private final SchemaRegistry schemaReg;
+ private final Set<StorageRowListener> rowLstns = Collections.newSetFromMap(new ConcurrentHashMap<>());
+
/**
* Constructor.
*
@@ -123,4 +130,22 @@ public class TableImpl implements Table {
throw new IgniteInternalException(e);
}
}
+
+ public void addRowListener(StorageRowListener lsnr) {
+ rowLstns.add(lsnr);
+ }
+
+ public void removeRowListener(StorageRowListener lsnr) {
+ rowLstns.remove(lsnr);
+ }
+
+ @Override
+ public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId) {
+ rowLstns.forEach(l -> l.onUpdate(oldRow, newRow, partId));
+ }
+
+ @Override
+ public void onRemove(BinaryRow row, int partId) {
+ rowLstns.forEach(l -> l.onRemove(row, partId));
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableStorageRowListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableStorageRowListener.java
new file mode 100644
index 0000000..f5379be
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableStorageRowListener.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Listen storage changes and pass the updates to the {@link TableImpl}.
+ */
+public class TableStorageRowListener implements StorageRowListener {
+ private StorageRowListener delegate;
+
+ /** {@inheritDoc} */
+ @Override
+ public void onUpdate(@Nullable BinaryRow oldRow, BinaryRow newRow, int partId) {
+ if (delegate != null) {
+ delegate.onUpdate(oldRow, newRow, partId);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void onRemove(BinaryRow row, int partId) {
+ if (delegate != null) {
+ delegate.onRemove(row, partId);
+ }
+ }
+
+ /**
+ * Listen storage changes and pass the updates to the {@link TableImpl}.
+ */
+ public void listen(StorageRowListener lsnr) {
+ delegate = lsnr;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 7f3edc1..8743a97 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -59,9 +59,9 @@ import org.apache.ignite.internal.configuration.schema.ExtendedTableConfiguratio
import org.apache.ignite.internal.configuration.schema.ExtendedTableView;
import org.apache.ignite.internal.configuration.schema.SchemaConfiguration;
import org.apache.ignite.internal.configuration.schema.SchemaView;
+import org.apache.ignite.internal.manager.AbstractProducer;
import org.apache.ignite.internal.manager.EventListener;
import org.apache.ignite.internal.manager.IgniteComponent;
-import org.apache.ignite.internal.manager.Producer;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.SchemaException;
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.storage.rocksdb.RocksDbStorageEngine;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableStorageRowListener;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.VersionedRowStore;
@@ -103,7 +104,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Table manager.
*/
-public class TableManager extends Producer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal,
+public class TableManager extends AbstractProducer<TableEvent, TableEventParameters> implements IgniteTables, IgniteTablesInternal,
IgniteComponent {
/** The logger. */
private static final IgniteLogger LOG = IgniteLogger.forClass(TableManager.class);
@@ -319,7 +320,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
toAdd.removeAll(oldPartitionAssignment);
- InternalTable internalTable = tablesById.get(tblId).internalTable();
+ TableImpl tbl = tablesById.get(tblId);
+ InternalTable internalTable = tbl.internalTable();
// Create new raft nodes according to new assignments.
@@ -329,7 +331,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
newPartitionAssignment,
toAdd,
() -> new PartitionListener(tblId,
- new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager))
+ new VersionedRowStore(internalTable.storage().getOrCreatePartition(partId), txManager, tbl))
).thenAccept(
updatedRaftGroupService -> ((InternalTableImpl) internalTable).updateInternalTableRaftGroupService(
partId, updatedRaftGroupService)
@@ -477,6 +479,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
tableStorage.start();
+ TableStorageRowListener storageUpdLsnr = new TableStorageRowListener();
+
for (int p = 0; p < partitions; p++) {
int partId = p;
@@ -486,7 +490,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
raftGroupName(tblId, p),
assignment.get(p),
() -> new PartitionListener(tblId,
- new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager))
+ new VersionedRowStore(tableStorage.getOrCreatePartition(partId), txManager, storageUpdLsnr))
)
);
} catch (NodeStoppingException e) {
@@ -540,6 +544,8 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp
schemaRegistry
);
+ storageUpdLsnr.listen(table);
+
tables.put(name, table);
tablesById.put(tblId, table);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
index 4b76ccd..5c3e29c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.storage.PartitionStorage;
import org.apache.ignite.internal.storage.SearchRow;
import org.apache.ignite.internal.storage.basic.BinarySearchRow;
import org.apache.ignite.internal.storage.basic.SimpleDataRow;
+import org.apache.ignite.internal.table.StorageRowListener;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxState;
@@ -51,6 +52,9 @@ public class VersionedRowStore {
/** Storage delegate. */
private final PartitionStorage storage;
+ /** Update listener. */
+ private final StorageRowListener lsnr;
+
/** Transaction manager. */
private TxManager txManager;
@@ -61,8 +65,19 @@ public class VersionedRowStore {
* @param txManager The TX manager.
*/
public VersionedRowStore(@NotNull PartitionStorage storage, @NotNull TxManager txManager) {
+ this(storage, txManager, StorageRowListener.NO_OP);
+ }
+
+ /**
+ * The constructor.
+ *
+ * @param storage The storage.
+ * @param txManager The TX manager.
+ */
+ public VersionedRowStore(@NotNull PartitionStorage storage, @NotNull TxManager txManager, StorageRowListener lsnr) {
this.storage = Objects.requireNonNull(storage);
this.txManager = Objects.requireNonNull(txManager);
+ this.lsnr = lsnr;
}
/**
@@ -127,6 +142,8 @@ public class VersionedRowStore {
Pair<BinaryRow, BinaryRow> pair = resolve(unpack(storage.read(key)), ts);
storage.write(pack(key, new Value(row, pair.getSecond(), ts)));
+
+ lsnr.onUpdate(pair.getSecond(), row, storage.partitionId());
}
/**
@@ -168,6 +185,8 @@ public class VersionedRowStore {
// Write a tombstone.
storage.write(pack(key, new Value(null, pair.getSecond(), ts)));
+ lsnr.onRemove(pair.getSecond(), storage.partitionId());
+
return true;
}
@@ -205,6 +224,8 @@ public class VersionedRowStore {
storage.write(pack(key, new Value(row, null, ts)));
+ lsnr.onUpdate(null, row, storage.partitionId());
+
return true;
}
diff --git a/parent/pom.xml b/parent/pom.xml
index 0d90766..9416062 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -312,6 +312,18 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-index</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-index-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- 3rd party dependencies -->
<dependency>
<groupId>org.jetbrains</groupId>
diff --git a/pom.xml b/pom.xml
index ac1f885..57cceab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,8 @@
<module>modules/configuration-annotation-processor</module>
<module>modules/configuration-api</module>
<module>modules/core</module>
+ <module>modules/index</module>
+ <module>modules/index-api</module>
<module>modules/marshaller-common</module>
<module>modules/metastorage</module>
<module>modules/metastorage-client</module>