You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by am...@apache.org on 2021/04/09 17:05:39 UTC
[ignite-3] branch main updated: IGNITE-14330: Table binary view
initial implementation. (#73)
This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 74e93ed IGNITE-14330: Table binary view initial implementation. (#73)
74e93ed is described below
commit 74e93eda95d22a5d2480953ed4166d9a7302a165
Author: Andrew V. Mashenkov <AM...@users.noreply.github.com>
AuthorDate: Fri Apr 9 20:05:32 2021 +0300
IGNITE-14330: Table binary view initial implementation. (#73)
---
.../org/apache/ignite/lang/IgniteException.java | 69 ++++
.../java/org/apache/ignite/lang/IgniteFuture.java | 27 --
.../ignite/table/InvokeProcessorException.java | 4 +-
.../java/org/apache/ignite/table/KeyValueView.java | 36 +-
.../java/org/apache/ignite/table/RecordView.java | 13 +
.../java/org/apache/ignite/table/TableView.java | 42 +--
modules/schema/pom.xml | 6 +
.../ignite/internal/schema/AssemblyException.java | 13 +-
.../apache/ignite/internal/schema/BinaryRow.java | 143 ++++++++
.../ignite/internal/schema/ByteBufferRow.java | 92 +++--
.../org/apache/ignite/internal/schema/Columns.java | 7 +
.../internal/schema/InvalidTypeException.java | 4 +-
.../org/apache/ignite/internal/schema/Row.java | 285 ++++++++++------
.../ignite/internal/schema/RowAssembler.java | 33 +-
.../ignite/internal/schema/SchemaDescriptor.java | 6 +-
.../schema/builder/SchemaTableBuilderImpl.java | 2 +-
.../schema/marshaller/AbstractSerializer.java | 6 +-
.../internal/schema/marshaller/MarshallerUtil.java | 3 +-
.../schema/marshaller/SerializationException.java | 4 +-
.../marshaller/asm/AsmSerializerGenerator.java | 5 +-
.../marshaller/asm/ColumnAccessCodeGenerator.java | 3 +-
.../asm/ObjectMarshallerCodeGenerator.java | 3 +-
.../modification/TableModificationBuilderImpl.java | 10 +-
.../apache/ignite/internal/util/ObjectFactory.java | 7 +-
.../org/apache/ignite/internal/schema/RowTest.java | 8 +-
.../apache/ignite/internal/schema/TestUtils.java | 2 +-
.../schema/marshaller/JavaSerializerTest.java | 3 +-
modules/table/pom.xml | 6 +
.../marshaller/KVSerializer.java} | 32 +-
.../{Marshaller.java => RecordSerializer.java} | 41 +--
.../schema/marshaller/TupleMarshaller.java} | 33 +-
.../ignite/internal/table/AbstractTableView.java | 65 ++++
.../ignite/internal/table/InternalTable.java | 153 +++++++++
.../ignite/internal/table/KVBinaryViewImpl.java | 292 ++++++++++++++++
.../apache/ignite/internal/table/KVViewImpl.java | 141 ++++----
.../internal/table/KeyValueBinaryViewImpl.java | 241 -------------
.../ignite/internal/table/RecordViewImpl.java | 170 +++++----
.../ignite/internal/table/RowChunkAdapter.java | 6 +-
.../apache/ignite/internal/table/TableImpl.java | 186 ++++++----
.../org/apache/ignite/internal/table/TableRow.java | 101 +++++-
.../ignite/internal/table/TableRowAdapter.java | 142 --------
.../{RowChunk.java => TableSchemaManager.java} | 16 +-
...{RowChunkAdapter.java => TupleBuilderImpl.java} | 70 ++--
.../ignite/internal/table/TupleMarshallerImpl.java | 98 ++++++
.../test/java/org/apache/ignite/table/Example.java | 9 +-
.../apache/ignite/table/KVViewOperationsTest.java | 380 +++++++++++++++++++++
.../table/TableBinaryViewOperationsTest.java | 348 +++++++++++++++++++
.../ignite/table/impl/DummyInternalTableImpl.java | 233 +++++++++++++
.../ignite/table/impl/DummySchemaManagerImpl.java} | 41 ++-
.../apache/ignite/table/impl/TestTableRowImpl.java | 132 -------
.../ignite/table/impl/TestTableStorageImpl.java | 82 -----
51 files changed, 2688 insertions(+), 1166 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java b/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
new file mode 100644
index 0000000..9afa9ba
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/lang/IgniteException.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.lang;
+
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * General Ignite exception. This exception is used to indicate any error condition within the node.
+ */
+public class IgniteException extends RuntimeException {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /**
+ * Creates an empty exception.
+ */
+ public IgniteException() {
+ // No-op.
+ }
+
+ /**
+ * Creates a new exception with the given error message.
+ *
+ * @param msg Error message.
+ */
+ public IgniteException(String msg) {
+ super(msg);
+ }
+
+ /**
+ * Creates a new grid exception with the given throwable as a cause and
+ * source of error message.
+ *
+ * @param cause Non-null throwable cause.
+ */
+ public IgniteException(Throwable cause) {
+ this(cause.getMessage(), cause);
+ }
+
+ /**
+ * Creates a new exception with the given error message and optional nested exception.
+ *
+ * @param msg Error message.
+ * @param cause Optional nested exception (can be {@code null}).
+ */
+ public IgniteException(String msg, @Nullable Throwable cause) {
+ super(msg, cause);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return getClass() + ": " + getMessage();
+ }
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/IgniteFuture.java b/modules/api/src/main/java/org/apache/ignite/lang/IgniteFuture.java
deleted file mode 100644
index aa27703..0000000
--- a/modules/api/src/main/java/org/apache/ignite/lang/IgniteFuture.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.lang;
-
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Future;
-
-/**
- * Future providing chaining capabilities for the construction of computation pipelines.
- */
-public interface IgniteFuture<T> extends CompletionStage<T>, Future<T> {
-}
diff --git a/modules/api/src/main/java/org/apache/ignite/table/InvokeProcessorException.java b/modules/api/src/main/java/org/apache/ignite/table/InvokeProcessorException.java
index 4c8b013..32f16c0 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/InvokeProcessorException.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/InvokeProcessorException.java
@@ -17,9 +17,11 @@
package org.apache.ignite.table;
+import org.apache.ignite.lang.IgniteException;
+
/**
* InvokeProcessor invocation exception.
*/
-public class InvokeProcessorException extends RuntimeException {
+public class InvokeProcessorException extends IgniteException {
}
diff --git a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
index cb86b25..32d1883 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
@@ -20,7 +20,7 @@ package org.apache.ignite.table;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import org.apache.ignite.lang.IgniteFuture;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.table.mapper.Mappers;
import org.jetbrains.annotations.NotNull;
@@ -47,7 +47,7 @@ public interface KeyValueView<K, V> {
* @param key The key whose associated value is to be returned.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<V> getAsync(K key);
+ @NotNull CompletableFuture<V> getAsync(K key);
/**
* Get values associated with given keys.
@@ -63,7 +63,7 @@ public interface KeyValueView<K, V> {
* @param keys Keys whose associated values are to be returned.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Map<K, V>> getAllAsync(Collection<K> keys);
+ @NotNull CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys);
/**
* Determines if the table contains an entry for the specified key.
@@ -88,7 +88,7 @@ public interface KeyValueView<K, V> {
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Void> putAsync(K key, V val);
+ @NotNull CompletableFuture<Void> putAsync(K key, V val);
/**
* Put associated key-value pairs.
@@ -103,7 +103,7 @@ public interface KeyValueView<K, V> {
* @param pairs Key-value pairs.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Void> putAllAsync(Map<K, V> pairs);
+ @NotNull CompletableFuture<Void> putAllAsync(Map<K, V> pairs);
/**
* Puts new or replaces existed value associated with given key into the table.
@@ -121,7 +121,7 @@ public interface KeyValueView<K, V> {
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<V> getAndPutAsync(K key, V val);
+ @NotNull CompletableFuture<V> getAndPutAsync(K key, V val);
/**
* Puts value associated with given key into the table if not exists.
@@ -130,7 +130,7 @@ public interface KeyValueView<K, V> {
* @param val Value to be associated with the specified key.
* @return {@code True} if successful, {@code false} otherwise.
*/
- boolean putIfAbsent(K key, V val);
+ boolean putIfAbsent(K key, @NotNull V val);
/**
* Asynchronously puts value associated with given key into the table if not exists.
@@ -139,7 +139,7 @@ public interface KeyValueView<K, V> {
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> putIfAbsentAsync(K key, V val);
+ @NotNull CompletableFuture<Boolean> putIfAbsentAsync(K key, V val);
/**
* Removes value associated with given key from the table.
@@ -155,7 +155,7 @@ public interface KeyValueView<K, V> {
* @param key Key whose mapping is to be removed from the table.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> removeAsync(K key);
+ @NotNull CompletableFuture<Boolean> removeAsync(K key);
/**
* Removes expected value associated with given key from the table.
@@ -164,7 +164,7 @@ public interface KeyValueView<K, V> {
* @param val Expected value.
* @return {@code True} if the expected value for the specified key was successfully removed, {@code false} otherwise.
*/
- boolean remove(K key, V val);
+ boolean remove(K key, @NotNull V val);
/**
* Asynchronously removes expected value associated with given key from the table.
@@ -173,7 +173,7 @@ public interface KeyValueView<K, V> {
* @param val Expected value.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> removeAsync(K key, V val);
+ @NotNull CompletableFuture<Boolean> removeAsync(K key, V val);
/**
* Remove values associated with given keys from the table.
@@ -189,7 +189,7 @@ public interface KeyValueView<K, V> {
* @param keys Keys whose mapping is to be removed from the table.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<K> removeAllAsync(Collection<K> keys);
+ @NotNull CompletableFuture<K> removeAllAsync(Collection<K> keys);
/**
* Gets then removes value associated with given key from the table.
@@ -205,7 +205,7 @@ public interface KeyValueView<K, V> {
* @param key Key whose mapping is to be removed from the table.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<V> getAndRemoveAsync(K key);
+ @NotNull CompletableFuture<V> getAndRemoveAsync(K key);
/**
* Replaces the value for a key only if exists. This is equivalent to
@@ -232,7 +232,7 @@ public interface KeyValueView<K, V> {
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> replaceAsync(K key, V val);
+ @NotNull CompletableFuture<Boolean> replaceAsync(K key, V val);
/**
* Replaces the expected value for a key. This is equivalent to
@@ -261,7 +261,7 @@ public interface KeyValueView<K, V> {
* @param newVal Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal);
+ @NotNull CompletableFuture<Boolean> replaceAsync(K key, V oldVal, V newVal);
/**
* Replaces the value for a given key only if exists. This is equivalent to
@@ -290,7 +290,7 @@ public interface KeyValueView<K, V> {
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<V> getAndReplaceAsync(K key, V val);
+ @NotNull CompletableFuture<V> getAndReplaceAsync(K key, V val);
/**
* Executes invoke processor code against the value associated with the provided key.
@@ -314,7 +314,7 @@ public interface KeyValueView<K, V> {
* @return Future representing pending completion of the operation.
* @see InvokeProcessor
*/
- @NotNull <R extends Serializable> IgniteFuture<R> invokeAsync(K key, InvokeProcessor<K, V, R> proc,
+ @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(K key, InvokeProcessor<K, V, R> proc,
Serializable... args);
/**
@@ -342,7 +342,7 @@ public interface KeyValueView<K, V> {
* @return Future representing pending completion of the operation.
* @see InvokeProcessor
*/
- @NotNull <R extends Serializable> IgniteFuture<Map<K, R>> invokeAllAsync(
+ @NotNull <R extends Serializable> CompletableFuture<Map<K, R>> invokeAllAsync(
Collection<K> keys,
InvokeProcessor<K, V, R> proc,
Serializable... args);
diff --git a/modules/api/src/main/java/org/apache/ignite/table/RecordView.java b/modules/api/src/main/java/org/apache/ignite/table/RecordView.java
index 2ad84d1..959f8dc 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/RecordView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/RecordView.java
@@ -17,11 +17,13 @@
package org.apache.ignite.table;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.table.mapper.Mappers;
/**
* Record view of table provides methods to access table records.
* <p>
+ *
* @param <R> Record type.
* @apiNote 'Record class field' >-< 'table column' mapping laid down in implementation.
* @apiNote Some methods require a record with the only key fields set. This is not mandatory requirement
@@ -39,4 +41,15 @@ public interface RecordView<R> extends TableView<R> {
* @return Record with all fields filled from the table.
*/
R fill(R recObjToFill);
+
+ /**
+ * Asynchronously fills given record with the values from the table.
+ * Similar to {@link #get(Object)}, but return original object with filled value fields.
+ * <p>
+ * All value fields of given object will be rewritten.
+ *
+ * @param recObjToFill Record object with key fields to be filled.
+ * @return Future representing pending completion of the operation.
+ */
+ CompletableFuture<R> fillAsync(R recObjToFill);
}
diff --git a/modules/api/src/main/java/org/apache/ignite/table/TableView.java b/modules/api/src/main/java/org/apache/ignite/table/TableView.java
index ebe6bb3..bb50a08 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/TableView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/TableView.java
@@ -20,7 +20,7 @@ package org.apache.ignite.table;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import org.apache.ignite.lang.IgniteFuture;
+import java.util.concurrent.CompletableFuture;
import org.jetbrains.annotations.NotNull;
/**
@@ -45,7 +45,7 @@ public interface TableView<R> {
* @param keyRec Record with key columns set.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<R> getAsync(R keyRec);
+ @NotNull CompletableFuture<R> getAsync(R keyRec);
/**
* Get records from the table.
@@ -61,7 +61,7 @@ public interface TableView<R> {
* @param keyRecs Records with key columns set.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Collection<R>> getAllAsync(Collection<R> keyRecs);
+ @NotNull CompletableFuture<Collection<R>> getAllAsync(Collection<R> keyRecs);
/**
* Inserts a record into the table if does not exist or replaces the existed one.
@@ -76,7 +76,7 @@ public interface TableView<R> {
* @param rec Record to insert into the table.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Void> upsertAsync(R rec);
+ @NotNull CompletableFuture<Void> upsertAsync(R rec);
/**
* Insert records into the table if does not exist or replaces the existed one.
@@ -91,7 +91,7 @@ public interface TableView<R> {
* @param recs Records to insert into the table.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Void> upsertAllAsync(Collection<R> recs);
+ @NotNull CompletableFuture<Void> upsertAllAsync(Collection<R> recs);
/**
* Inserts a record into the table or replaces if exists and return replaced previous record.
@@ -107,7 +107,7 @@ public interface TableView<R> {
* @param rec Record to insert into the table.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<R> getAndUpsertAsync(R rec);
+ @NotNull CompletableFuture<R> getAndUpsertAsync(R rec);
/**
* Inserts a record into the table if not exists.
@@ -123,7 +123,7 @@ public interface TableView<R> {
* @param rec Record to insert into the table.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> insertAsync(R rec);
+ @NotNull CompletableFuture<Boolean> insertAsync(R rec);
/**
* Insert records into the table which do not exist, skipping existed ones.
@@ -139,7 +139,7 @@ public interface TableView<R> {
* @param recs Records to insert into the table.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Collection<R>> insertAllAsync(Collection<R> recs);
+ @NotNull CompletableFuture<Collection<R>> insertAllAsync(Collection<R> recs);
/**
* Replaces an existed record associated with the same key columns values as the given one has.
@@ -155,7 +155,7 @@ public interface TableView<R> {
* @param rec Record to replace with.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> replaceAsync(R rec);
+ @NotNull CompletableFuture<Boolean> replaceAsync(R rec);
/**
* Replaces an expected record in the table with the given new one.
@@ -173,7 +173,7 @@ public interface TableView<R> {
* @param newRec Record to replace with.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> replaceAsync(R oldRec, R newRec);
+ @NotNull CompletableFuture<Boolean> replaceAsync(R oldRec, R newRec);
/**
* Gets an existed record associated with the same key columns values as the given one has,
@@ -191,7 +191,7 @@ public interface TableView<R> {
* @param rec Record to replace with.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<R> getAndReplaceAsync(R rec);
+ @NotNull CompletableFuture<R> getAndReplaceAsync(R rec);
/**
* Deletes a record with the same key columns values as the given one from the table.
@@ -207,23 +207,23 @@ public interface TableView<R> {
* @param keyRec Record with key columns set.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> deleteAsync(R keyRec);
+ @NotNull CompletableFuture<Boolean> deleteAsync(R keyRec);
/**
* Deletes the given record from the table.
*
- * @param oldRec Record to delete.
+ * @param rec Record to delete.
* @return {@code True} if removed successfully, {@code false} otherwise.
*/
- boolean deleteExact(R oldRec);
+ boolean deleteExact(R rec);
/**
* Asynchronously deletes given record from the table.
*
- * @param oldRec Record to delete.
+ * @param rec Record to delete.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Boolean> deleteExactAsync(R oldRec);
+ @NotNull CompletableFuture<Boolean> deleteExactAsync(R rec);
/**
* Gets then deletes a record with the same key columns values from the table.
@@ -239,7 +239,7 @@ public interface TableView<R> {
* @param rec Record with key columns set.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<R> getAndDeleteAsync(R rec);
+ @NotNull CompletableFuture<R> getAndDeleteAsync(R rec);
/**
* Remove records with the same key columns values as the given one has from the table.
@@ -255,7 +255,7 @@ public interface TableView<R> {
* @param recs Records with key columns set.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Collection<R>> deleteAllAsync(Collection<R> recs);
+ @NotNull CompletableFuture<Collection<R>> deleteAllAsync(Collection<R> recs);
/**
* Remove given records from the table.
@@ -271,7 +271,7 @@ public interface TableView<R> {
* @param recs Records to delete.
* @return Future representing pending completion of the operation.
*/
- @NotNull IgniteFuture<Collection<R>> deleteAllExactAsync(Collection<R> recs);
+ @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(Collection<R> recs);
/**
* Executes an InvokeProcessor code against a record with the same key columns values as the given one has.
@@ -288,7 +288,7 @@ public interface TableView<R> {
* @param keyRec Record with key columns set.
* @return Future representing pending completion of the operation.
*/
- @NotNull <T extends Serializable> IgniteFuture<T> invokeAsync(R keyRec, InvokeProcessor<R, R, T> proc);
+ @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(R keyRec, InvokeProcessor<R, R, T> proc);
/**
* Executes an InvokeProcessor code against records with the same key columns values as the given ones has.
@@ -304,6 +304,6 @@ public interface TableView<R> {
* @param keyRecs Records with key columns set.
* @return Results of the processing.
*/
- @NotNull <T extends Serializable> IgniteFuture<Map<R, T>> invokeAllAsync(Collection<R> keyRecs,
+ @NotNull <T extends Serializable> CompletableFuture<Map<R, T>> invokeAllAsync(Collection<R> keyRecs,
InvokeProcessor<R, R, T> proc);
}
diff --git a/modules/schema/pom.xml b/modules/schema/pom.xml
index 19a2171..390af15 100644
--- a/modules/schema/pom.xml
+++ b/modules/schema/pom.xml
@@ -41,6 +41,12 @@
<dependency>
<groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
<artifactId>ignite-bytecode</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java
index b62fc94..d25815d 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/AssemblyException.java
@@ -17,11 +17,20 @@
package org.apache.ignite.internal.schema;
+import org.apache.ignite.lang.IgniteInternalException;
+
/**
- * The exception is thrown when the row assembler encountered an unrecoverable error during the field encoding.
+ * The exception is thrown when the row assembler encountered an unrecoverable error during the row marshalling.
* After the exception is thrown, the assembler remains in an invalid state and should be discarded.
*/
-public class AssemblyException extends RuntimeException {
+public class AssemblyException extends IgniteInternalException {
+ /**
+ * @param errMsg Error message
+ */
+ public AssemblyException(String errMsg) {
+ super(errMsg);
+ }
+
/**
* @param errMsg Error message
* @param cause Cause for this error.
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
new file mode 100644
index 0000000..0bf67b4
--- /dev/null
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.schema;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Binary row interface.
+ * The class contains low-level methods to read row data.
+ */
+public interface BinaryRow {
+ /** */
+ int SCHEMA_VERSION_OFFSET = 0;
+ /** */
+ int FLAGS_FIELD_OFFSET = SCHEMA_VERSION_OFFSET + 2;
+ /** */
+ int KEY_HASH_FIELD_OFFSET = FLAGS_FIELD_OFFSET + 2;
+ /** */
+ int KEY_CHUNK_OFFSET = KEY_HASH_FIELD_OFFSET + 4;
+ /** */
+ int TOTAL_LEN_FIELD_SIZE = 4;
+ /** */
+ int VARLEN_TABLE_SIZE_FIELD_SIZE = 2;
+ /** */
+ int VARLEN_COLUMN_OFFSET_FIELD_SIZE = 2;
+
+ /**
+ * @return Row schema version.
+ */
+ int schemaVersion();
+
+ /**
+ * @return {@code True} if row has non-null value, {@code false} otherwise.
+ */
+ boolean hasValue();
+
+ // TODO: IGNITE-14199. Add row version.
+ //GridRowVersion version();
+
+ /**
+ * Row hash code is a result of hash function applied to the row affinity columns values.
+ *
+ * @return Row hash code.
+ */
+ int hash();
+
+ /**
+ * @return ByteBuffer slice representing the key chunk.
+ */
+ ByteBuffer keySlice();
+
+ /**
+ * @return ByteBuffer slice representing the value chunk.
+ */
+ ByteBuffer valueSlice();
+
+ /**
+ * Writes binary row to given stream.
+ *
+ * @throws IOException If write operation fails.
+ */
+ void writeTo(OutputStream stream) throws IOException;
+
+ /**
+ * @param off Offset.
+ * @return Byte primitive value.
+ */
+ byte readByte(int off);
+
+ /**
+ * @param off Offset.
+ * @return Short primitive value.
+ */
+ short readShort(int off);
+
+ /**
+ * @param off Offset.
+ * @return Integer primitive value.
+ */
+ int readInteger(int off);
+
+ /**
+ * @param off Offset.
+ * @return Long primitive value.
+ */
+ long readLong(int off);
+
+ /**
+ * @param off Offset.
+ * @return Float primitive value.
+ */
+ float readFloat(int off);
+
+ /**
+ * @param off Offset.
+ * @return Double primitive value.
+ */
+ double readDouble(int off);
+
+ /**
+ * @param off Offset.
+ * @return String value.
+ */
+ String readString(int off, int len);
+
+ /**
+ * @param off Offset.
+ * @return Byte array.
+ */
+ byte[] readBytes(int off, int len);
+
+ /**
+ *
+ */
+ final class RowFlags {
+ /** Tombstone flag. */
+ public static final int TOMBSTONE = 1;
+
+ /** Null-value flag. */
+ public static final int NULL_VALUE = 1 << 1;
+
+ /** Stub. */
+ private RowFlags() {
+ }
+ }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 001346a..86672c7 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.schema;
+import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
@@ -24,52 +26,82 @@ import java.nio.charset.StandardCharsets;
/**
* Heap byte buffer-based row.
*/
-public class ByteBufferRow extends Row {
- /** */
+public class ByteBufferRow implements BinaryRow {
+ /** Row buffer. */
private final ByteBuffer buf;
/**
- * @param arr Array representation of the row.
+ * @param data Array representation of the row.
*/
- public ByteBufferRow(SchemaDescriptor sch, byte[] arr) {
- super(sch);
+ public ByteBufferRow(byte[] data) {
+ this(ByteBuffer.wrap(data).order(ByteOrder.LITTLE_ENDIAN));
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param buf Buffer representing the row.
+ */
+ public ByteBufferRow(ByteBuffer buf) {
+ assert buf.order() == ByteOrder.LITTLE_ENDIAN;
+
+ this.buf = buf;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int schemaVersion() {
+ return Short.toUnsignedInt(readShort(SCHEMA_VERSION_OFFSET));
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasValue() {
+ short flags = readShort(FLAGS_FIELD_OFFSET);
+
+ return (flags & (RowFlags.NULL_VALUE | RowFlags.TOMBSTONE)) == 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hash() {
+ return readInteger(KEY_HASH_FIELD_OFFSET);
+ }
- buf = ByteBuffer.wrap(arr);
- buf.order(ByteOrder.LITTLE_ENDIAN);
+ /** {@inheritDoc} */
+ @Override public void writeTo(OutputStream stream) throws IOException {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override protected byte readByte(int off) {
+ @Override public byte readByte(int off) {
return (byte)(buf.get(off) & 0xFF);
}
/** {@inheritDoc} */
- @Override protected short readShort(int off) {
+ @Override public short readShort(int off) {
return (short)(buf.getShort(off) & 0xFFFF);
}
/** {@inheritDoc} */
- @Override protected int readInteger(int off) {
+ @Override public int readInteger(int off) {
return buf.getInt(off);
}
/** {@inheritDoc} */
- @Override protected long readLong(int off) {
+ @Override public long readLong(int off) {
return buf.getLong(off);
}
/** {@inheritDoc} */
- @Override protected float readFloat(int off) {
+ @Override public float readFloat(int off) {
return buf.getFloat(off);
}
/** {@inheritDoc} */
- @Override protected double readDouble(int off) {
+ @Override public double readDouble(int off) {
return buf.getDouble(off);
}
/** {@inheritDoc} */
- @Override protected byte[] readBytes(int off, int len) {
+ @Override public byte[] readBytes(int off, int len) {
try {
byte[] res = new byte[len];
@@ -85,27 +117,35 @@ public class ByteBufferRow extends Row {
}
/** {@inheritDoc} */
- @Override protected String readString(int off, int len) {
+ @Override public String readString(int off, int len) {
return new String(buf.array(), off, len, StandardCharsets.UTF_8);
}
/** {@inheritDoc} */
- @Override public byte[] rowBytes() {
- return buf.array();
- }
+ @Override public ByteBuffer keySlice() {
+ final int off = KEY_CHUNK_OFFSET;
+ final int len = readInteger(off);
- /** {@inheritDoc} */
- @Override public byte[] keyChunkBytes() {
- final int len = readInteger(KEY_CHUNK_OFFSET);
-
- return readBytes(KEY_HASH_FIELD_OFFSET, len); // Includes key-hash.
+ try {
+ return buf.limit(off + len).position(off).slice();
+ }
+ finally {
+ buf.position(0); // Reset bounds.
+ buf.limit(buf.capacity());
+ }
}
/** {@inheritDoc} */
- @Override public byte[] valueChunkBytes() {
+ @Override public ByteBuffer valueSlice() {
int off = KEY_CHUNK_OFFSET + readInteger(KEY_CHUNK_OFFSET);
- int len = readInteger(off);
+ int len = hasValue() ? readInteger(off) : 0;
- return readBytes(off, len);
+ try {
+ return buf.limit(off + len).position(off).slice();
+ }
+ finally {
+ buf.position(0); // Reset bounds.
+ buf.limit(buf.capacity());
+ }
}
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Columns.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Columns.java
index 6a3f1df..669ed6d 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Columns.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Columns.java
@@ -139,6 +139,13 @@ public class Columns {
}
/**
+ * @return Sorted columns.
+ */
+ public Column[] columns() {
+ return cols;
+ }
+
+ /**
* @return Number of columns in this chunk.
*/
public int length() {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/InvalidTypeException.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/InvalidTypeException.java
index b4345b3..390a2d9 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/InvalidTypeException.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/InvalidTypeException.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.schema;
+import org.apache.ignite.lang.IgniteInternalException;
+
/**
* An exception thrown when an attempt to read an invalid type from a row is performed.
*/
-public class InvalidTypeException extends IllegalArgumentException {
+public class InvalidTypeException extends IgniteInternalException {
/**
* @param msg Error message.
*/
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
index 71dce1d..a074733 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/Row.java
@@ -17,167 +17,217 @@
package org.apache.ignite.internal.schema;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.UUID;
/**
+ * Schema-aware row.
+ *
* The class contains non-generic methods to read boxed and unboxed primitives based on the schema column types.
* Any type conversions and coercions should be implemented outside the row by the key-value or query runtime.
* When a non-boxed primitive is read from a null column value, it is converted to the primitive type default value.
*/
-public abstract class Row {
- /** */
- public static final int SCHEMA_VERSION_OFFSET = 0;
-
- /** */
- public static final int FLAGS_FIELD_OFFSET = SCHEMA_VERSION_OFFSET + 2;
-
- /** */
- public static final int KEY_HASH_FIELD_OFFSET = FLAGS_FIELD_OFFSET + 2;
-
- /** */
- public static final int KEY_CHUNK_OFFSET = KEY_HASH_FIELD_OFFSET + 4;
-
- /** */
- public static final int TOTAL_LEN_FIELD_SIZE = 4;
-
- /** */
- public static final int VARLEN_TABLE_SIZE_FIELD_SIZE = 2;
-
- /** */
- public static final int VARLEN_COLUMN_OFFSET_FIELD_SIZE = 2;
+public class Row implements BinaryRow {
+ /** Schema descriptor. */
+ private final SchemaDescriptor schema;
- /** */
- public static final class RowFlags {
- /** Tombstone flag. */
- public static final int TOMBSTONE = 1;
+ /** Binary row. */
+ private final BinaryRow row;
- /** Null-value flag. */
- public static final int NULL_VALUE = 1 << 1;
+ /**
+ * Constructor.
+ *
+ * @param schema Schema.
+ * @param row Binary row representation.
+ */
+ public Row(SchemaDescriptor schema, BinaryRow row) {
+ assert row.schemaVersion() == schema.version();
- /** Stub. */
- private RowFlags() {
- }
+ this.row = row;
+ this.schema = schema;
}
- /** Schema descriptor for which this row was created. */
- private final SchemaDescriptor schema;
-
/**
- * @param schema Schema instance.
+ * @return Row schema.
*/
- protected Row(SchemaDescriptor schema) {
- this.schema = schema;
+ public SchemaDescriptor rowSchema() {
+ return schema;
}
/**
* @return {@code True} if row has non-null value, {@code false} otherwise.
*/
- public boolean hasValue() {
- short flags = readShort(FLAGS_FIELD_OFFSET);
-
- return (flags & (RowFlags.NULL_VALUE | RowFlags.TOMBSTONE)) == 0;
+ @Override public boolean hasValue() {
+ return row.hasValue();
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public byte byteValue(int col) {
+ public byte byteValue(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.BYTE);
return off < 0 ? 0 : readByte(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public Byte byteValueBoxed(int col) {
+ public Byte byteValueBoxed(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.BYTE);
return off < 0 ? null : readByte(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public short shortValue(int col) {
+ public short shortValue(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.SHORT);
return off < 0 ? 0 : readShort(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public Short shortValueBoxed(int col) {
+ public Short shortValueBoxed(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.SHORT);
return off < 0 ? null : readShort(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public int intValue(int col) {
+ public int intValue(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.INTEGER);
return off < 0 ? 0 : readInteger(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public Integer intValueBoxed(int col) {
+ public Integer intValueBoxed(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.INTEGER);
return off < 0 ? null : readInteger(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public long longValue(int col) {
+ public long longValue(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.LONG);
return off < 0 ? 0 : readLong(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public Long longValueBoxed(int col) {
+ public Long longValueBoxed(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.LONG);
return off < 0 ? null : readLong(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public float floatValue(int col) {
+ public float floatValue(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.FLOAT);
return off < 0 ? 0.f : readFloat(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public Float floatValueBoxed(int col) {
+ public Float floatValueBoxed(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.FLOAT);
return off < 0 ? null : readFloat(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public double doubleValue(int col) {
+ public double doubleValue(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.DOUBLE);
return off < 0 ? 0.d : readDouble(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public Double doubleValueBoxed(int col) {
+ public Double doubleValueBoxed(int col) throws InvalidTypeException {
long off = findColumn(col, NativeTypeSpec.DOUBLE);
return off < 0 ? null : readDouble(offset(off));
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public String stringValue(int col) {
+ public String stringValue(int col) throws InvalidTypeException {
long offLen = findColumn(col, NativeTypeSpec.STRING);
if (offLen < 0)
@@ -190,8 +240,13 @@ public abstract class Row {
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public byte[] bytesValue(int col) {
+ public byte[] bytesValue(int col) throws InvalidTypeException {
long offLen = findColumn(col, NativeTypeSpec.BYTES);
if (offLen < 0)
@@ -204,8 +259,13 @@ public abstract class Row {
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public UUID uuidValue(int col) {
+ public UUID uuidValue(int col) throws InvalidTypeException {
long found = findColumn(col, NativeTypeSpec.UUID);
if (found < 0)
@@ -220,18 +280,22 @@ public abstract class Row {
}
/**
+ * Reads value for specified column.
+ *
+ * @param col Column index.
+ * @return Column value.
+ * @throws InvalidTypeException If actual column type does not match the requested column type.
*/
- public BitSet bitmaskValue(int colIdx) {
- long offLen = findColumn(colIdx, NativeTypeSpec.BITMASK);
+ public BitSet bitmaskValue(int col) throws InvalidTypeException {
+ long offLen = findColumn(col, NativeTypeSpec.BITMASK);
if (offLen < 0)
return null;
int off = offset(offLen);
+ int len = columnLength(col);
- Column col = schema.column(colIdx);
-
- return BitSet.valueOf(readBytes(off, col.type().length()));
+ return BitSet.valueOf(readBytes(off, len));
}
/**
@@ -249,9 +313,9 @@ public abstract class Row {
* @see #length(long)
* @see InvalidTypeException If actual column type does not match the requested column type.
*/
- private long findColumn(int colIdx, NativeTypeSpec type) {
+ protected long findColumn(int colIdx, NativeTypeSpec type) throws InvalidTypeException {
// Get base offset (key start or value start) for the given column.
- boolean keyCol = schema.keyColumn(colIdx);
+ boolean keyCol = schema.isKeyColumn(colIdx);
Columns cols = keyCol ? schema.keyColumns() : schema.valueColumns();
int off = KEY_CHUNK_OFFSET;
@@ -279,6 +343,16 @@ public abstract class Row {
}
/**
+ * @param colIdx Column index.
+ * @return Column length.
+ */
+ private int columnLength(int colIdx) {
+ Column col = schema.column(colIdx);
+
+ return col.type().length();
+ }
+
+ /**
* Checks the row's null map for the given column index in the chunk.
*
* @param baseOff Offset of the chunk start in the row.
@@ -417,47 +491,68 @@ public abstract class Row {
return baseOff + TOTAL_LEN_FIELD_SIZE + VARLEN_TABLE_SIZE_FIELD_SIZE;
}
- /**
- */
- protected abstract byte readByte(int off);
+ /** {@inheritDoc} */
+ @Override public int schemaVersion() {
+ return row.schemaVersion();
+ }
- /**
- */
- protected abstract short readShort(int off);
+ /** {@inheritDoc} */
+ @Override public int hash() {
+ return row.hash();
+ }
- /**
- */
- protected abstract int readInteger(int off);
+ /** {@inheritDoc} */
+ @Override public ByteBuffer keySlice() {
+ return row.keySlice();
+ }
- /**
- */
- protected abstract long readLong(int off);
+ /** {@inheritDoc} */
+ @Override public ByteBuffer valueSlice() {
+ return row.valueSlice();
+ }
- /**
- */
- protected abstract float readFloat(int off);
+ /** {@inheritDoc} */
+ @Override public void writeTo(OutputStream stream) throws IOException {
+ row.writeTo(stream);
+ }
- /**
- */
- protected abstract double readDouble(int off);
+ /** {@inheritDoc} */
+ @Override public byte readByte(int off) {
+ return row.readByte(off);
+ }
- /**
- */
- protected abstract String readString(int off, int len);
+ /** {@inheritDoc} */
+ @Override public short readShort(int off) {
+ return row.readShort(off);
+ }
- /**
- */
- protected abstract byte[] readBytes(int off, int len);
+ /** {@inheritDoc} */
+ @Override public int readInteger(int off) {
+ return row.readInteger(off);
+ }
- /**
- */
- public abstract byte[] rowBytes();
+ /** {@inheritDoc} */
+ @Override public long readLong(int off) {
+ return row.readLong(off);
+ }
- /**
- */
- public abstract byte[] keyChunkBytes();
+ /** {@inheritDoc} */
+ @Override public float readFloat(int off) {
+ return row.readFloat(off);
+ }
- /**
- */
- public abstract byte[] valueChunkBytes();
+ /** {@inheritDoc} */
+ @Override public double readDouble(int off) {
+ return row.readDouble(off);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String readString(int off, int len) {
+ return row.readString(off, len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] readBytes(int off, int len) {
+ return row.readBytes(off, len);
+ }
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
index 9095596..65f58ee 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/RowAssembler.java
@@ -35,9 +35,7 @@ import java.util.UUID;
* @see #utf8EncodedLength(CharSequence)
*/
public class RowAssembler {
- /**
- *
- */
+ /** Schema. */
private final SchemaDescriptor schema;
/** The number of non-null varlen columns in values chunk. */
@@ -67,6 +65,9 @@ public class RowAssembler {
/** Offset of the varlen table for current chunk. */
private int varlenTblOff;
+ /** Flags. */
+ private short flags;
+
/** Charset encoder for strings. Initialized lazily. */
private CharsetEncoder strEncoder;
@@ -109,7 +110,7 @@ public class RowAssembler {
* @return Row's chunk size.
*/
public static int rowChunkSize(Columns cols, int nonNullVarlenCols, int nonNullVarlenSize) {
- int size = Row.TOTAL_LEN_FIELD_SIZE + Row.VARLEN_TABLE_SIZE_FIELD_SIZE +
+ int size = BinaryRow.TOTAL_LEN_FIELD_SIZE + BinaryRow.VARLEN_TABLE_SIZE_FIELD_SIZE +
varlenTableSize(nonNullVarlenCols) + cols.nullMapSize();
for (int i = 0; i < cols.numberOfFixsizeColumns(); i++)
@@ -139,9 +140,10 @@ public class RowAssembler {
curCols = schema.keyColumns();
- initOffsets(Row.KEY_CHUNK_OFFSET, nonNullVarlenKeyCols);
+ initOffsets(BinaryRow.KEY_CHUNK_OFFSET, nonNullVarlenKeyCols);
buf.putShort(0, (short)schema.version());
+ buf.putShort(baseOff + BinaryRow.TOTAL_LEN_FIELD_SIZE, (short)nonNullVarlenKeyCols);
}
/**
@@ -161,7 +163,7 @@ public class RowAssembler {
int nonNullVarlenValCols,
int nonNullVarlenValSize
) {
- return Row.KEY_CHUNK_OFFSET +
+ return BinaryRow.KEY_CHUNK_OFFSET +
rowChunkSize(keyCols, nonNullVarlenKeyCols, nonNullVarlenKeySize) +
rowChunkSize(valCols, nonNullVarlenValCols, nonNullVarlenValSize);
}
@@ -338,6 +340,17 @@ public class RowAssembler {
* @return Serialized row.
*/
public byte[] build() {
+ if (schema.keyColumns() == curCols)
+ throw new AssemblyException("Key column missed: colIdx=" + curCol);
+ else {
+ if (curCol == 0)
+ flags |= BinaryRow.RowFlags.NULL_VALUE;
+ else if (schema.valueColumns().length() != curCol)
+ throw new AssemblyException("Value column missed: colIdx=" + curCol);
+ }
+
+ buf.putShort(BinaryRow.FLAGS_FIELD_OFFSET, flags);
+
return buf.toArray();
}
@@ -425,8 +438,10 @@ public class RowAssembler {
buf.putShort(baseOff, (short)keyLen);
- if (schema.valueColumns() == curCols)
+ if (schema.valueColumns() == curCols) {
+ buf.putShort(baseOff + BinaryRow.TOTAL_LEN_FIELD_SIZE, (short)nonNullVarlenValCols);
return; // No more columns.
+ }
curCols = schema.valueColumns(); // Switch key->value columns.
@@ -444,9 +459,7 @@ public class RowAssembler {
curCol = 0;
curVarlenTblEntry = 0;
- buf.putShort(baseOff + Row.TOTAL_LEN_FIELD_SIZE, (short)nonNullVarlenCols);
-
- varlenTblOff = baseOff + Row.TOTAL_LEN_FIELD_SIZE + Row.VARLEN_TABLE_SIZE_FIELD_SIZE;
+ varlenTblOff = baseOff + BinaryRow.TOTAL_LEN_FIELD_SIZE + BinaryRow.VARLEN_TABLE_SIZE_FIELD_SIZE;
nullMapOff = varlenTblOff + varlenTableSize(nonNullVarlenCols);
curOff = nullMapOff + curCols.nullMapSize();
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
index 7eff0dd..15c0962 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/SchemaDescriptor.java
@@ -51,8 +51,8 @@ public class SchemaDescriptor {
colMap = new HashMap<>(keyCols.length + valCols.length);
- Arrays.stream(keyCols).forEach(c -> colMap.put(c.name(), c));
- Arrays.stream(valCols).forEach(c -> colMap.put(c.name(), c));
+ Arrays.stream(this.keyCols.columns()).forEach(c -> colMap.put(c.name(), c));
+ Arrays.stream(this.valCols.columns()).forEach(c -> colMap.put(c.name(), c));
}
/**
@@ -66,7 +66,7 @@ public class SchemaDescriptor {
* @param idx Index to check.
* @return {@code true} if the column belongs to the key chunk.
*/
- public boolean keyColumn(int idx) {
+ public boolean isKeyColumn(int idx) {
return idx < keyCols.length();
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java
index 405f30e..1c5f063 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/builder/SchemaTableBuilderImpl.java
@@ -64,7 +64,7 @@ public class SchemaTableBuilderImpl implements SchemaTableBuilder {
@Override public SchemaTableBuilderImpl columns(Column... columns) {
for (int i = 0; i < columns.length; i++) {
if (this.columns.put(columns[i].name(), columns[i]) != null)
- throw new IllegalStateException("Column with same name already exists: columnName=" + columns[i].name());
+ throw new IllegalArgumentException("Column with same name already exists: columnName=" + columns[i].name());
}
return this;
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
index 02dfe3c..5b7e420 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/AbstractSerializer.java
@@ -50,21 +50,21 @@ public abstract class AbstractSerializer implements Serializer {
/** {@inheritDoc} */
@Override public <K> K deserializeKey(byte[] data) throws SerializationException {
- final Row row = new ByteBufferRow(schema, data);
+ final Row row = new Row(schema, new ByteBufferRow(data));
return (K)deserializeKey0(row);
}
/** {@inheritDoc} */
@Override public <V> V deserializeValue(byte[] data) throws SerializationException {
- final Row row = new ByteBufferRow(schema, data);
+ final Row row = new Row(schema, new ByteBufferRow(data));
return (V)deserializeValue0(row);
}
/** {@inheritDoc} */
@Override public <K, V> Pair<K, V> deserialize(byte[] data) throws SerializationException {
- final Row row = new ByteBufferRow(schema, data);
+ final Row row = new Row(schema, new ByteBufferRow(data));
return new Pair<>((K)deserializeKey0(row), (V)deserializeValue0(row));
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
index 883c986..b149c00 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/MarshallerUtil.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema.marshaller;
import java.util.BitSet;
import java.util.UUID;
+import org.apache.ignite.internal.schema.InvalidTypeException;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.schema.RowAssembler;
import org.apache.ignite.internal.util.ObjectFactory;
@@ -43,7 +44,7 @@ public final class MarshallerUtil {
return RowAssembler.utf8EncodedLength((CharSequence)val);
default:
- throw new IllegalStateException("Unsupported test varsize type: " + type);
+ throw new InvalidTypeException("Unsupported variable-length type: " + type);
}
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java
index f87f91a..25d7f2a 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.schema.marshaller;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+
/**
* Serialization exception.
*/
-public class SerializationException extends Exception {
+public class SerializationException extends IgniteInternalCheckedException {
/**
* Constructor.
*
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
index f55d09b..34998ab 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/AsmSerializerGenerator.java
@@ -50,6 +50,7 @@ import org.apache.ignite.internal.schema.marshaller.SerializationException;
import org.apache.ignite.internal.schema.marshaller.Serializer;
import org.apache.ignite.internal.schema.marshaller.SerializerFactory;
import org.apache.ignite.internal.util.ObjectFactory;
+import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteLogger;
/**
@@ -122,7 +123,7 @@ public class AsmSerializerGenerator implements SerializerFactory {
}
catch (Exception | LinkageError e) {
- throw new IllegalStateException("Failed to create serializer for key-value pair: schemaVer=" + schema.version() +
+ throw new IgniteInternalException("Failed to create serializer for key-value pair: schemaVer=" + schema.version() +
", keyClass=" + keyClass.getSimpleName() + ", valueClass=" + valClass.getSimpleName(), e);
}
}
@@ -336,7 +337,7 @@ public class AsmSerializerGenerator implements SerializerFactory {
methodDef.getBody().append(new IfStatement().condition(BytecodeExpressions.isNull(asm)).ifTrue(
new BytecodeBlock()
- .append(BytecodeExpressions.newInstance(IllegalStateException.class, BytecodeExpressions.constantString("ASM can't be null.")))
+ .append(BytecodeExpressions.newInstance(IgniteInternalException.class, BytecodeExpressions.constantString("ASM can't be null.")))
.throwObject()
));
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/ColumnAccessCodeGenerator.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/ColumnAccessCodeGenerator.java
index 87cb0f2..9e72e87 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/ColumnAccessCodeGenerator.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/ColumnAccessCodeGenerator.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.schema.marshaller.asm;
import java.util.BitSet;
import java.util.UUID;
import org.apache.ignite.internal.schema.marshaller.BinaryMode;
+import org.apache.ignite.lang.IgniteInternalException;
/**
* Row access code generator.
@@ -66,7 +67,7 @@ public class ColumnAccessCodeGenerator {
return new ColumnAccessCodeGenerator("bitmaskValue", "appendBitmask", BitSet.class, colIdx);
}
- throw new IllegalStateException("Unsupported binary mode: " + mode);
+ throw new IgniteInternalException("Unsupported binary mode: " + mode);
}
/** Reader handle name. */
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/ObjectMarshallerCodeGenerator.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/ObjectMarshallerCodeGenerator.java
index 0d2ae13..ab417d5 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/ObjectMarshallerCodeGenerator.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/asm/ObjectMarshallerCodeGenerator.java
@@ -36,6 +36,7 @@ import java.util.EnumSet;
import org.apache.ignite.internal.schema.Columns;
import org.apache.ignite.internal.schema.marshaller.MarshallerUtil;
import org.apache.ignite.internal.schema.marshaller.Serializer;
+import org.apache.ignite.lang.IgniteInternalException;
/**
* Generates {@link Serializer} methods code.
@@ -67,7 +68,7 @@ class ObjectMarshallerCodeGenerator implements MarshallerCodeGenerator {
}
}
catch (NoSuchFieldException ex) {
- throw new IllegalStateException(ex);
+ throw new IgniteInternalException(ex);
}
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/modification/TableModificationBuilderImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/modification/TableModificationBuilderImpl.java
index b1b875c..fa9bece 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/modification/TableModificationBuilderImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/modification/TableModificationBuilderImpl.java
@@ -43,7 +43,7 @@ public class TableModificationBuilderImpl implements TableModificationBuilder {
/** {@inheritDoc} */
@Override public TableModificationBuilder addColumn(Column column) {
if (table.hasColumn(column.name()))
- throw new IllegalStateException("Duplicate column: name='" + column.name() + '\'');
+ throw new IllegalArgumentException("Duplicate column: name='" + column.name() + '\'');
return this;
}
@@ -51,7 +51,7 @@ public class TableModificationBuilderImpl implements TableModificationBuilder {
/** {@inheritDoc} */
@Override public TableModificationBuilder addKeyColumn(Column column) {
if (table.hasColumn(column.name()))
- throw new IllegalStateException("Duplicate column: name=" + column.name() + '\'');
+ throw new IllegalArgumentException("Duplicate column: name=" + column.name() + '\'');
return this;
}
@@ -64,7 +64,7 @@ public class TableModificationBuilderImpl implements TableModificationBuilder {
/** {@inheritDoc} */
@Override public TableModificationBuilder dropColumn(String columnName) {
if (table.hasKeyColumn(columnName))
- throw new IllegalStateException("Can't drop key column: name=" + columnName);
+ throw new IllegalArgumentException("Can't drop key column: name=" + columnName);
return this;
}
@@ -74,7 +74,7 @@ public class TableModificationBuilderImpl implements TableModificationBuilder {
assert !PrimaryIndex.PRIMARY_KEY_INDEX_NAME.equals(index.name());
if (table.indices().stream().anyMatch(i -> i.name().equals(index.name())))
- throw new IllegalStateException("Index already exists: name=" + index.name() + '\'');
+ throw new IllegalArgumentException("Index already exists: name=" + index.name() + '\'');
return this;
}
@@ -82,7 +82,7 @@ public class TableModificationBuilderImpl implements TableModificationBuilder {
/** {@inheritDoc} */
@Override public TableModificationBuilder dropIndex(String indexName) {
if (PrimaryIndex.PRIMARY_KEY_INDEX_NAME.equals(indexName))
- throw new IllegalStateException("Can't drop primary key index: name=" + indexName);
+ throw new IllegalArgumentException("Can't drop primary key index: name=" + indexName);
return this;
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/util/ObjectFactory.java b/modules/schema/src/main/java/org/apache/ignite/internal/util/ObjectFactory.java
index 30f9365..0fcc84a 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/util/ObjectFactory.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/util/ObjectFactory.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.util;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
+import org.apache.ignite.lang.IgniteInternalException;
/**
* Object factory.
@@ -38,17 +39,17 @@ public class ObjectFactory<T> implements Factory<T> {
cnstr.setAccessible(true);
}
catch (NoSuchMethodException e) {
- throw new IllegalStateException("Class has no default constructor: class=" + tClass.getName(), e);
+ throw new IgniteInternalException("Class has no default constructor: class=" + tClass.getName(), e);
}
}
/** {@inheritDoc} */
- @Override public T create() throws IllegalStateException {
+ @Override public T create() throws IgniteInternalException {
try {
return cnstr.newInstance();
}
catch (IllegalAccessException | InvocationTargetException | InstantiationException e) {
- throw new IllegalStateException("Failed to instantiate class: " + cnstr.getDeclaringClass().getName(), e);
+ throw new IgniteInternalException("Failed to instantiate class: " + cnstr.getDeclaringClass().getName(), e);
}
}
}
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java
index 6168aff..e50598d 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/RowTest.java
@@ -211,7 +211,7 @@ public class RowTest {
if (vals[i] != null && !type.fixedLength()) {
if (type == NativeTypeSpec.BYTES) {
byte[] val = (byte[])vals[i];
- if (schema.keyColumn(i)) {
+ if (schema.isKeyColumn(i)) {
nonNullVarLenKeyCols++;
nonNullVarLenKeySize += val.length;
}
@@ -221,7 +221,7 @@ public class RowTest {
}
}
else if (type == NativeTypeSpec.STRING) {
- if (schema.keyColumn(i)) {
+ if (schema.isKeyColumn(i)) {
nonNullVarLenKeyCols++;
nonNullVarLenKeySize += RowAssembler.utf8EncodedLength((CharSequence)vals[i]);
}
@@ -231,7 +231,7 @@ public class RowTest {
}
}
else
- throw new IllegalStateException("Unsupported test varlen type: " + type);
+ throw new IllegalStateException("Unsupported variable-length type: " + type);
}
}
@@ -296,7 +296,7 @@ public class RowTest {
byte[] data = asm.build();
- ByteBufferRow tup = new ByteBufferRow(schema, data);
+ Row tup = new Row(schema, new ByteBufferRow(data));
for (int i = 0; i < vals.length; i++) {
Column col = schema.column(i);
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/TestUtils.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/TestUtils.java
index 69e9682..9a84e71 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/TestUtils.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/TestUtils.java
@@ -67,7 +67,7 @@ public final class TestUtils {
}
default:
- throw new IllegalStateException("Unsupported type: " + type);
+ throw new IllegalArgumentException("Unsupported type: " + type);
}
}
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/JavaSerializerTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/JavaSerializerTest.java
index c117749..1f39361 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/JavaSerializerTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/marshaller/JavaSerializerTest.java
@@ -44,6 +44,7 @@ import org.apache.ignite.internal.schema.TestUtils;
import org.apache.ignite.internal.schema.marshaller.asm.AsmSerializerGenerator;
import org.apache.ignite.internal.schema.marshaller.reflection.JavaSerializerFactory;
import org.apache.ignite.internal.util.ObjectFactory;
+import org.apache.ignite.lang.IgniteInternalException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DynamicNode;
import org.junit.jupiter.api.TestFactory;
@@ -266,7 +267,7 @@ public class JavaSerializerTest {
final Object key = WrongTestObject.randomObject(rnd);
final Object val = WrongTestObject.randomObject(rnd);
- assertThrows(IllegalStateException.class, () -> factory.create(schema, key.getClass(), val.getClass()));
+ assertThrows(IgniteInternalException.class, () -> factory.create(schema, key.getClass(), val.getClass()));
}
/**
diff --git a/modules/table/pom.xml b/modules/table/pom.xml
index 5e5e6dc..1f58901 100644
--- a/modules/table/pom.xml
+++ b/modules/table/pom.xml
@@ -48,6 +48,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.ignite</groupId>
+ <artifactId>ignite-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- Test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/storage/TableStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
similarity index 56%
rename from modules/table/src/main/java/org/apache/ignite/internal/storage/TableStorage.java
rename to modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
index 1d998b5..7668499 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/storage/TableStorage.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/KVSerializer.java
@@ -15,27 +15,31 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.storage;
+package org.apache.ignite.internal.schema.marshaller;
-import org.apache.ignite.internal.table.TableRow;
+import org.apache.ignite.internal.schema.Row;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
/**
- * Table storage facade.
+ * Key-value marshaller interface.
*/
-public interface TableStorage {
+public interface KVSerializer<K, V> {
/**
- * Gets row from storage.
- *
- * @param keyRow Row with key columns set.
- * @return Row with all columns set.
+ * @param obj Object to serialize.
+ * @return Table row with columns set from given object.
*/
- public TableRow get(TableRow keyRow);
+ Row serialize(@NotNull K obj, V val);
/**
- * Puts row from storage.
- *
- * @param row Row.
- * @return Replaced row or {@code null}.
+ * @param row Table row.
+ * @return Deserialized key object.
*/
- TableRow put(TableRow row);
+ @NotNull K deserializeKey(@NotNull Row row);
+
+ /**
+ * @param row Table row.
+ * @return Deserialized value object.
+ */
+ @Nullable V deserializeValue(@NotNull Row row);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/Marshaller.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/RecordSerializer.java
similarity index 54%
rename from modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/Marshaller.java
rename to modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/RecordSerializer.java
index 0af432c..08c676e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/Marshaller.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/RecordSerializer.java
@@ -17,51 +17,24 @@
package org.apache.ignite.internal.schema.marshaller;
-import org.apache.ignite.internal.table.TableRow;
-import org.apache.ignite.table.Tuple;
+import org.apache.ignite.internal.schema.Row;
import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
/**
- * Marshaller interface.
+ * Record serializer interface.
*/
-public interface Marshaller {
+public interface RecordSerializer<R> {
/**
- * @param obj Object to serialize.
+ * @param red Record to serialize.
* @return Table row with columns set from given object.
*/
- <T> TableRow serialize(@NotNull T obj);
-
- /**
- * @param tuple Record tuple.
- * @return Table row with columns set from given tuples.
- */
- TableRow marshalRecord(@NotNull Tuple tuple);
-
- /**
- * @param keyTuple Key tuple.
- * @param valTuple Value tuple.
- * @return Table row with columns set from given tuples.
- */
- TableRow marshalKVPair(@NotNull Tuple keyTuple, @Nullable Tuple valTuple);
-
- /**
- * @param row Table row.
- * @return Deserialized key object.
- */
- <K> @NotNull K deserializeKey(@NotNull TableRow row);
-
- /**
- * @param row Table row.
- * @return Deserialized value object.
- */
- <V> @Nullable V deserializeValue(@NotNull TableRow row);
+ Row serialize(@NotNull R red);
/**
* @param row Table row.
* @return Deserialized record object.
*/
- <R> R deserializeToRecord(@NotNull TableRow row);
+ R deserialize(@NotNull Row row);
/**
* Deserializes row and fills given record object fields.
@@ -70,5 +43,5 @@ public interface Marshaller {
* @param rec Record object to fill.
* @return Given record with filled fields from the given row.
*/
- <R> R deserializeToRecord(@NotNull TableRow row, @NotNull R rec);
+ R deserialize(@NotNull Row row, @NotNull R rec);
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
similarity index 53%
copy from modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java
copy to modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
index f87f91a..8eff2ca 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/schema/marshaller/TupleMarshaller.java
@@ -17,26 +17,29 @@
package org.apache.ignite.internal.schema.marshaller;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
/**
- * Serialization exception.
+ * Marshaller interface.
+ *
+ * @apiNote For key tuple marshalling, a {@code marshal(key, null)} method call must be used.
+ * A {@code marshal(key} may return the same result, but it validates value columns even if not specified.
+ *
*/
-public class SerializationException extends Exception {
+public interface TupleMarshaller {
/**
- * Constructor.
- *
- * @param cause Cause.
+ * @param tuple Record tuple.
+ * @return Table row with columns set from given tuples.
*/
- public SerializationException(Throwable cause) {
- super(cause);
- }
+ Row marshal(@NotNull Tuple tuple);
/**
- * Constructor.
- *
- * @param message Message.
- * @param cause Cause.
+ * @param keyTuple Key tuple.
+ * @param valTuple Value tuple.
+ * @return Table row with columns set from given tuples.
*/
- public SerializationException(String message, Throwable cause) {
- super(message, cause);
- }
+ Row marshal(@NotNull Tuple keyTuple, @Nullable Tuple valTuple);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
new file mode 100644
index 0000000..d5f143e
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/AbstractTableView.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Base class for Table views.
+ */
+abstract class AbstractTableView {
+ /** Internal table. */
+ protected final InternalTable tbl;
+
+ /** Schema manager. */
+ protected final TableSchemaManager schemaMgr;
+
+ /**
+ * Constructor
+ * @param tbl Internal table.
+ * @param schemaMgr Schema manager.
+ */
+ protected AbstractTableView(InternalTable tbl, TableSchemaManager schemaMgr) {
+ this.tbl = tbl;
+ this.schemaMgr = schemaMgr;
+ }
+
+ /**
+ * Waits for operation completion.
+ *
+ * @param fut Future to wait to.
+ * @return Future result.
+ */
+ protected <T> T sync(CompletableFuture<T> fut) {
+ try {
+ return fut.get();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // Restore interrupt flag.
+
+ //TODO: IGNITE-14500 Replace with public exception with an error code.
+ throw new IgniteInternalException(e);
+ }
+ catch (ExecutionException e) {
+ //TODO: IGNITE-14500 Replace with public exception with an error code (or unwrap?).
+ throw new IgniteInternalException(e);
+ }
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
new file mode 100644
index 0000000..1bddfed
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Internal table facade provides low-level methods for table operations.
+ * The facade hides TX/replication protocol over table storage abstractions.
+ */
+public interface InternalTable {
+ /**
+ * Asynchronously gets a row with same key columns values as given one from the table.
+ *
+ * @param keyRow Row with key columns set.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow);
+
+ /**
+ * Asynchronously get rows from the table.
+ *
+ * @param keyRows Rows with key columns set.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows);
+
+ /**
+ * Asynchronously inserts a row into the table if does not exist or replaces the existed one.
+ *
+ * @param row Row to insert into the table.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Void> upsert(BinaryRow row);
+
+ /**
+ * Asynchronously inserts a row into the table if does not exist or replaces the existed one.
+ *
+ * @param rows Rows to insert into the table.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows);
+
+ /**
+ * Asynchronously inserts a row into the table or replaces if exists and return replaced previous row.
+ *
+ * @param row Row to insert into the table.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<BinaryRow> getAndUpsert(BinaryRow row);
+
+ /**
+ * Asynchronously inserts a row into the table if not exists.
+ *
+ * @param row Row to insert into the table.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Boolean> insert(BinaryRow row);
+
+ /**
+ * Asynchronously insert rows into the table which do not exist, skipping existed ones.
+ *
+ * @param rows Rows to insert into the table.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows);
+
+ /**
+ * Asynchronously replaces an existed row associated with the same key columns values as the given one has.
+ *
+ * @param row Row to replace with.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Boolean> replace(BinaryRow row);
+
+ /**
+ * Asynchronously replaces an expected row in the table with the given new one.
+ *
+ * @param oldRow Row to replace.
+ * @param newRow Row to replace with.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Boolean> replace(BinaryRow oldRow, BinaryRow newRow);
+
+ /**
+ * Asynchronously gets an existed row associated with the same key columns values as the given one has,
+ * then replaces with the given one.
+ *
+ * @param row Row to replace with.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<BinaryRow> getAndReplace(BinaryRow row);
+
+ /**
+ * Asynchronously deletes a row with the same key columns values as the given one from the table.
+ *
+ * @param keyRow Row with key columns set.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Boolean> delete(BinaryRow keyRow);
+
+ /**
+ * Asynchronously deletes given row from the table.
+ *
+ * @param oldRow Row to delete.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Boolean> deleteExact(BinaryRow oldRow);
+
+ /**
+ * Asynchronously gets then deletes a row with the same key columns values from the table.
+ *
+ * @param row Row with key columns set.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<BinaryRow> getAndDelete(BinaryRow row);
+
+ /**
+ * Asynchronously remove rows with the same key columns values as the given one has from the table.
+ *
+ * @param rows Rows with key columns set.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows);
+
+ /**
+ * Asynchronously remove given rows from the table.
+ *
+ * @param rows Rows to delete.
+ * @return Future representing pending completion of the operation.
+ */
+ @NotNull CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows);
+
+ //TODO: IGNTIE-14488. Add invoke() methods.
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
new file mode 100644
index 0000000..f27642a
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.table.InvokeProcessor;
+import org.apache.ignite.table.KeyValueBinaryView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.TupleBuilder;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Key-value view implementation for binary user-object representation.
+ *
+ * @implNote Key-value {@link Tuple}s represents marshalled user-objects
+ * regarding the binary object concept.
+ */
+public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinaryView {
+
+ /** Marshaller. */
+ private final TupleMarshallerImpl marsh;
+
+ /**
+ * Constructor.
+ *
+ * @param tbl Table storage.
+ * @param schemaMgr Schema manager.
+ */
+ public KVBinaryViewImpl(InternalTable tbl, TableSchemaManager schemaMgr) {
+ super(tbl, schemaMgr);
+
+ marsh = new TupleMarshallerImpl(schemaMgr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Tuple get(Tuple key) {
+ return sync(getAsync(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> getAsync(Tuple key) {
+ Objects.requireNonNull(key);
+
+ Row kRow = marshaller().marshal(key, null); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.get(kRow) // Load async.
+ .thenApply(this::wrap) // Binary -> schema-aware row
+ .thenApply(t -> t == null ? null : t.valueChunk()); // Narrow to value.
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<Tuple, Tuple> getAll(Collection<Tuple> keys) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Map<Tuple, Tuple>> getAllAsync(Collection<Tuple> keys) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean contains(Tuple key) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void put(Tuple key, Tuple val) {
+ sync(putAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> putAsync(Tuple key, Tuple val) {
+ Objects.requireNonNull(key);
+
+ Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.upsert(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void putAll(Map<Tuple, Tuple> pairs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> putAllAsync(Map<Tuple, Tuple> pairs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Tuple getAndPut(Tuple key, Tuple val) {
+ return sync(getAndPutAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> getAndPutAsync(Tuple key, Tuple val) {
+ Objects.requireNonNull(key);
+
+ Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.getAndUpsert(row)
+ .thenApply(this::wrap) // Binary -> schema-aware row
+ .thenApply(t -> t == null ? null : t.valueChunk()); // Narrow to value.
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean putIfAbsent(Tuple key, Tuple val) {
+ return sync(putIfAbsentAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(Tuple key, Tuple val) {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(val);
+
+ Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.insert(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(Tuple key) {
+ return sync(removeAsync(key));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> removeAsync(Tuple key) {
+ Objects.requireNonNull(key);
+
+ Row row = marshaller().marshal(key, null); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.delete(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean remove(Tuple key, Tuple val) {
+ return sync(removeAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> removeAsync(Tuple key, Tuple val) {
+ Objects.requireNonNull(key);
+ Objects.requireNonNull(val);
+
+ Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.deleteExact(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public Tuple getAndRemove(Tuple key) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(Tuple key) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(Tuple key, Tuple val) {
+ return sync(replaceAsync(key, val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple key, Tuple val) {
+ Objects.requireNonNull(key);
+
+ Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.replace(row);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean replace(Tuple key, Tuple oldVal, Tuple newVal) {
+ return sync(replaceAsync(key, oldVal, newVal));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple key, Tuple oldVal, Tuple newVal) {
+ Objects.requireNonNull(key);
+
+ Row oldRow = marshaller().marshal(key, oldVal); // Convert to portable format to pass TX/storage layer.
+ Row newRow = marshaller().marshal(key, newVal); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.replace(oldRow, newRow);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Tuple getAndReplace(Tuple key, Tuple val) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(Tuple key, Tuple val) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R extends Serializable> R invoke(
+ Tuple key,
+ InvokeProcessor<Tuple, Tuple, R> proc,
+ Serializable... args
+ ) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
+ Tuple key,
+ InvokeProcessor<Tuple, Tuple, R> proc,
+ Serializable... args
+ ) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R extends Serializable> Map<Tuple, R> invokeAll(
+ Collection<Tuple> keys,
+ InvokeProcessor<Tuple, Tuple, R> proc,
+ Serializable... args
+ ) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull <R extends Serializable> CompletableFuture<Map<Tuple, R>> invokeAllAsync(
+ Collection<Tuple> keys,
+ InvokeProcessor<Tuple, Tuple, R> proc,
+ Serializable... args
+ ) {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public TupleBuilder tupleBuilder() {
+ return new TupleBuilderImpl();
+ }
+
+ /**
+ * @return Marshaller.
+ */
+ private TupleMarshaller marshaller() {
+ return marsh;
+ }
+
+ /**
+ * @param row Binary row.
+ * @return Table row.
+ */
+ protected TableRow wrap(BinaryRow row) {
+ if (row == null)
+ return null;
+
+ final SchemaDescriptor schema = schemaMgr.schema(row.schemaVersion());
+
+ return new TableRow(schema, new Row(schema, row));
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
index c057911..b0642fd 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
@@ -20,9 +20,12 @@ package org.apache.ignite.internal.table;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import org.apache.ignite.internal.schema.marshaller.Marshaller;
-import org.apache.ignite.internal.storage.TableStorage;
-import org.apache.ignite.lang.IgniteFuture;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.KVSerializer;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.mapper.KeyMapper;
@@ -32,174 +35,175 @@ import org.jetbrains.annotations.NotNull;
/**
* Key-value view implementation.
*/
-public class KVViewImpl<K, V> implements KeyValueView<K, V> {
- /** Underlying storage. */
- private final TableStorage tbl;
-
+public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<K, V> {
/**
* Constructor.
*
* @param tbl Table storage.
+ * @param schemaMgr Schema manager.
* @param keyMapper Key class mapper.
* @param valueMapper Value class mapper.
*/
- public KVViewImpl(TableStorage tbl, KeyMapper<K> keyMapper, ValueMapper<V> valueMapper) {
- this.tbl = tbl;
+ public KVViewImpl(InternalTable tbl, TableSchemaManager schemaMgr, KeyMapper<K> keyMapper,
+ ValueMapper<V> valueMapper) {
+ super(tbl, schemaMgr);
}
/** {@inheritDoc} */
@Override public V get(K key) {
- final Marshaller marsh = marshaller();
+ return sync(getAsync(key));
+ }
- TableRow kRow = marsh.serialize(key);
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<V> getAsync(K key) {
+ Objects.requireNonNull(key);
- TableRow row = tbl.get(kRow);
+ final KVSerializer<K, V> marsh = marshaller();
- return marsh.deserializeValue(row);
- }
+ Row kRow = marsh.serialize(key, null); // Convert to portable format to pass TX/storage layer.
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<V> getAsync(K key) {
- return null;
+ return tbl.get(kRow)
+ .thenApply(this::wrap) // Binary -> schema-aware row
+ .thenApply(marsh::deserializeValue); // row -> deserialized obj.
}
/** {@inheritDoc} */
@Override public Map<K, V> getAll(Collection<K> keys) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
- return null;
+ @Override public @NotNull CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean contains(K key) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public void put(K key, V val) {
-
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Void> putAsync(K key, V val) {
- return null;
+ @Override public @NotNull CompletableFuture<Void> putAsync(K key, V val) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public void putAll(Map<K, V> pairs) {
-
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Void> putAllAsync(Map<K, V> pairs) {
- return null;
+ @Override public @NotNull CompletableFuture<Void> putAllAsync(Map<K, V> pairs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public V getAndPut(K key, V val) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<V> getAndPutAsync(K key, V val) {
- return null;
+ @Override public @NotNull CompletableFuture<V> getAndPutAsync(K key, V val) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean putIfAbsent(K key, V val) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> putIfAbsentAsync(K key, V val) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(K key, V val) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean remove(K key) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> removeAsync(K key) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> removeAsync(K key) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean remove(K key, V val) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> removeAsync(K key, V val) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> removeAsync(K key, V val) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public Collection<K> removeAll(Collection<K> keys) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<K> removeAllAsync(Collection<K> keys) {
- return null;
+ @Override public @NotNull CompletableFuture<K> removeAllAsync(Collection<K> keys) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public V getAndRemove(K key) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<V> getAndRemoveAsync(K key) {
- return null;
+ @Override public @NotNull CompletableFuture<V> getAndRemoveAsync(K key) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean replace(K key, V val) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> replaceAsync(K key, V val) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(K key, V val) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean replace(K key, V oldVal, V newVal) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public V getAndReplace(K key, V val) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<V> getAndReplaceAsync(K key, V val) {
- return null;
+ @Override public @NotNull CompletableFuture<V> getAndReplaceAsync(K key, V val) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public <R extends Serializable> R invoke(K key, InvokeProcessor<K, V, R> proc, Serializable... args) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull <R extends Serializable> IgniteFuture<R> invokeAsync(
+ @Override public @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
K key,
InvokeProcessor<K, V, R> proc,
Serializable... args
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@@ -208,21 +212,34 @@ public class KVViewImpl<K, V> implements KeyValueView<K, V> {
InvokeProcessor<K, V, R> proc,
Serializable... args
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull <R extends Serializable> IgniteFuture<Map<K, R>> invokeAllAsync(
+ @Override public @NotNull <R extends Serializable> CompletableFuture<Map<K, R>> invokeAllAsync(
Collection<K> keys,
InvokeProcessor<K, V, R> proc, Serializable... args
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/**
* @return Marshaller.
*/
- private Marshaller marshaller() {
- return null; // table.schemaManager().marshaller();
+ private KVSerializer<K, V> marshaller() {
+ throw new UnsupportedOperationException("Not implemented yet.");
+ }
+
+ /**
+ * @param row Binary row.
+ * @return Schema-aware row.
+ */
+ private Row wrap(BinaryRow row) {
+ if (row == null)
+ return null;
+
+ final SchemaDescriptor rowSchema = schemaMgr.schema(row.schemaVersion()); // Get a schema for row.
+
+ return new Row(rowSchema, row);
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
deleted file mode 100644
index e739985..0000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KeyValueBinaryViewImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Objects;
-import org.apache.ignite.internal.schema.marshaller.Marshaller;
-import org.apache.ignite.internal.storage.TableStorage;
-import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.table.InvokeProcessor;
-import org.apache.ignite.table.KeyValueBinaryView;
-import org.apache.ignite.table.Tuple;
-import org.apache.ignite.table.TupleBuilder;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Key-value view implementation for binary user-object representation.
- *
- * @implNote Key-value {@link Tuple}s represents marshalled user-objects
- * regarding the binary object concept.
- */
-public class KeyValueBinaryViewImpl implements KeyValueBinaryView {
- /** Underlying storage. */
- private final TableStorage tbl;
-
- /**
- * Constructor.
- *
- * @param tbl Table storage.
- */
- public KeyValueBinaryViewImpl(TableStorage tbl) {
- this.tbl = tbl;
- }
-
- /** {@inheritDoc} */
- @Override public Tuple get(Tuple key) {
- Objects.requireNonNull(key);
-
- return marshaller().marshalKVPair(key, null);
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> getAsync(Tuple key) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Map<Tuple, Tuple> getAll(Collection<Tuple> keys) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Map<Tuple, Tuple>> getAllAsync(Collection<Tuple> keys) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean contains(Tuple key) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public void put(Tuple key, Tuple val) {
- Objects.requireNonNull(key);
-
- final TableRow row = marshaller().marshalKVPair(key, val);
-
- tbl.put(row);
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Void> putAsync(Tuple key, Tuple val) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void putAll(Map<Tuple, Tuple> pairs) {
-
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Void> putAllAsync(Map<Tuple, Tuple> pairs) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Tuple getAndPut(Tuple key, Tuple val) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> getAndPutAsync(Tuple key, Tuple val) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean putIfAbsent(Tuple key, Tuple val) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> putIfAbsentAsync(Tuple key, Tuple val) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean remove(Tuple key) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> removeAsync(Tuple key) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean remove(Tuple key, Tuple val) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> removeAsync(Tuple key, Tuple val) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Tuple getAndRemove(Tuple key) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> getAndRemoveAsync(Tuple key) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean replace(Tuple key, Tuple val) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> replaceAsync(Tuple key, Tuple val) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean replace(Tuple key, Tuple oldVal, Tuple newVal) {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> replaceAsync(Tuple key, Tuple oldVal,
- Tuple newVal) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Tuple getAndReplace(Tuple key, Tuple val) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> getAndReplaceAsync(Tuple key, Tuple val) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public <R extends Serializable> R invoke(
- Tuple key,
- InvokeProcessor<Tuple, Tuple, R> proc,
- Serializable... args
- ) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull <R extends Serializable> IgniteFuture<R> invokeAsync(
- Tuple key,
- InvokeProcessor<Tuple, Tuple, R> proc,
- Serializable... args
- ) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public <R extends Serializable> Map<Tuple, R> invokeAll(
- Collection<Tuple> keys,
- InvokeProcessor<Tuple, Tuple, R> proc,
- Serializable... args
- ) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public @NotNull <R extends Serializable> IgniteFuture<Map<Tuple, R>> invokeAllAsync(
- Collection<Tuple> keys,
- InvokeProcessor<Tuple, Tuple, R> proc,
- Serializable... args
- ) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public TupleBuilder tupleBuilder() {
- return null;
- }
-
- /**
- * @return Marshaller.
- */
- private Marshaller marshaller() {
- return null;
- }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index 60464fa..686c328 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -20,9 +20,12 @@ package org.apache.ignite.internal.table;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import org.apache.ignite.internal.schema.marshaller.Marshaller;
-import org.apache.ignite.internal.storage.TableStorage;
-import org.apache.ignite.lang.IgniteFuture;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.RecordSerializer;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.RecordView;
import org.apache.ignite.table.mapper.RecordMapper;
@@ -31,196 +34,203 @@ import org.jetbrains.annotations.NotNull;
/**
* Record view implementation.
*/
-public class RecordViewImpl<R> implements RecordView<R> {
- /** Table */
- private final TableStorage tbl;
-
+public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R> {
/**
* Constructor.
*
* @param tbl Table.
+ * @param schemaMgr Schema manager.
* @param mapper Record class mapper.
*/
- public RecordViewImpl(TableStorage tbl, RecordMapper<R> mapper) {
- this.tbl = tbl;
+ public RecordViewImpl(InternalTable tbl, TableSchemaManager schemaMgr, RecordMapper<R> mapper) {
+ super(tbl, schemaMgr);
}
/** {@inheritDoc} */
- @Override public R get(R keyRec) {
- Marshaller marsh = marshaller();
+ @Override public R fill(R recObjToFill) {
+ return sync(fillAsync(recObjToFill));
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<R> fillAsync(R recObjToFill) {
+ Objects.requireNonNull(recObjToFill);
- TableRow kRow = marsh.serialize(keyRec);
+ RecordSerializer<R> marsh = serializer();
- TableRow tRow = tbl.get(kRow);
+ Row kRow = marsh.serialize(recObjToFill); // Convert to portable format to pass TX/storage layer.
- return marsh.deserializeToRecord(tRow);
+ return tbl.get(kRow) // Load async.
+ .thenApply(this::wrap) // Binary -> schema-aware row
+ .thenApply(r -> marsh.deserialize(r, recObjToFill)); // Deserialize and fill record.
}
/** {@inheritDoc} */
- @Override public R fill(R recObjToFill) {
- Marshaller marsh = marshaller();
+ @Override public R get(R keyRec) {
+ return sync(getAsync(keyRec));
+ }
- TableRow kRow = marsh.serialize(recObjToFill);
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<R> getAsync(R keyRec) {
+ Objects.requireNonNull(keyRec);
- TableRow tRow = tbl.get(kRow);
+ RecordSerializer<R> marsh = serializer();
- return marsh.deserializeToRecord(tRow, recObjToFill);
- }
+ Row kRow = marsh.serialize(keyRec); // Convert to portable format to pass TX/storage layer.
- /** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<R> getAsync(R keyRec) {
- return null;
+ return tbl.get(kRow) // Load async.
+ .thenApply(this::wrap) // Binary -> schema-aware row
+ .thenApply(marsh::deserialize); // Deserialize.
}
/** {@inheritDoc} */
@Override public Collection<R> getAll(Collection<R> keyRecs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Collection<R>> getAllAsync(Collection<R> keyRecs) {
- return null;
+ @Override public @NotNull CompletableFuture<Collection<R>> getAllAsync(Collection<R> keyRecs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public void upsert(R rec) {
-
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Void> upsertAsync(R rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Void> upsertAsync(R rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public void upsertAll(Collection<R> recs) {
-
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Void> upsertAllAsync(Collection<R> recs) {
- return null;
+ @Override public @NotNull CompletableFuture<Void> upsertAllAsync(Collection<R> recs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public R getAndUpsert(R rec) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<R> getAndUpsertAsync(R rec) {
- return null;
+ @Override public @NotNull CompletableFuture<R> getAndUpsertAsync(R rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean insert(R rec) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> insertAsync(R rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> insertAsync(R rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public Collection<R> insertAll(Collection<R> recs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Collection<R>> insertAllAsync(Collection<R> recs) {
- return null;
+ @Override public @NotNull CompletableFuture<Collection<R>> insertAllAsync(Collection<R> recs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean replace(R rec) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> replaceAsync(R rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(R rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean replace(R oldRec, R newRec) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> replaceAsync(R oldRec, R newRec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(R oldRec, R newRec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public R getAndReplace(R rec) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<R> getAndReplaceAsync(R rec) {
- return null;
+ @Override public @NotNull CompletableFuture<R> getAndReplaceAsync(R rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean delete(R keyRec) {
- return false;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> deleteAsync(R keyRec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> deleteAsync(R keyRec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean deleteExact(R oldRec) {
- return false;
+ @Override public boolean deleteExact(R rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> deleteExactAsync(R oldRec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(R rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public R getAndDelete(R rec) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<R> getAndDeleteAsync(R rec) {
- return null;
+ @Override public @NotNull CompletableFuture<R> getAndDeleteAsync(R rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public Collection<R> deleteAll(Collection<R> recs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Collection<R>> deleteAllAsync(Collection<R> recs) {
- return null;
+ @Override public @NotNull CompletableFuture<Collection<R>> deleteAllAsync(Collection<R> recs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public Collection<R> deleteAllExact(Collection<R> recs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Collection<R>> deleteAllExactAsync(Collection<R> recs) {
- return null;
+ @Override public @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(Collection<R> recs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public <T extends Serializable> T invoke(R keyRec, InvokeProcessor<R, R, T> proc) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull <T extends Serializable> IgniteFuture<T> invokeAsync(R keyRec,
+ @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(R keyRec,
InvokeProcessor<R, R, T> proc) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@@ -228,22 +238,34 @@ public class RecordViewImpl<R> implements RecordView<R> {
Collection<R> keyRecs,
InvokeProcessor<R, R, T> proc
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull <T extends Serializable> IgniteFuture<Map<R, T>> invokeAllAsync(
+ @Override public @NotNull <T extends Serializable> CompletableFuture<Map<R, T>> invokeAllAsync(
Collection<R> keyRecs,
InvokeProcessor<R, R, T> proc
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/**
* @return Marshaller.
*/
- private Marshaller marshaller() {
- return null; // table.schemaManager().marshaller();
+ private RecordSerializer<R> serializer() {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
+ /**
+ * @param row Binary row.
+ * @return Schema-aware row.
+ */
+ private Row wrap(BinaryRow row) {
+ if (row == null)
+ return null;
+
+ final SchemaDescriptor rowSchema = schemaMgr.schema(row.schemaVersion()); // Get a schema for row.
+
+ return new Row(rowSchema, row);
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
index e0ff1a7..ecc312f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
@@ -21,12 +21,13 @@ import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjects;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.NotNull;
/**
* Row to RowChunk adapter.
*/
-public abstract class RowChunkAdapter implements RowChunk {
+public abstract class RowChunkAdapter implements Tuple {
/**
* @param colName Column name.
* @return Column.
@@ -39,9 +40,6 @@ public abstract class RowChunkAdapter implements RowChunk {
protected abstract Row row();
/** {@inheritDoc} */
- @Override public abstract byte[] toBytes();
-
- /** {@inheritDoc} */
@Override public <T> T value(String colName) {
final Column col = columnByName(colName);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index f64bb21..44b4743 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -20,9 +20,12 @@ package org.apache.ignite.internal.table;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import org.apache.ignite.internal.schema.marshaller.Marshaller;
-import org.apache.ignite.internal.storage.TableStorage;
-import org.apache.ignite.lang.IgniteFuture;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
import org.apache.ignite.table.InvokeProcessor;
import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.KeyValueView;
@@ -38,185 +41,219 @@ import org.jetbrains.annotations.NotNull;
/**
* Table view implementation for binary objects.
*/
-public class TableImpl implements Table {
- /** Table. */
- private final TableStorage tbl;
+public class TableImpl extends AbstractTableView implements Table {
+ /** Marshaller. */
+ private final TupleMarshallerImpl marsh;
/**
* Constructor.
*
* @param tbl Table.
*/
- public TableImpl(TableStorage tbl) {
- this.tbl = tbl;
+ public TableImpl(InternalTable tbl, TableSchemaManager schemaMgr) {
+ super(tbl, schemaMgr);
+
+ marsh = new TupleMarshallerImpl(schemaMgr);
}
/** {@inheritDoc} */
@Override public <R> RecordView<R> recordView(RecordMapper<R> recMapper) {
- return new RecordViewImpl<>(tbl, recMapper);
+ return new RecordViewImpl<>(tbl, schemaMgr, recMapper);
}
/** {@inheritDoc} */
@Override public <K, V> KeyValueView<K, V> kvView(KeyMapper<K> keyMapper, ValueMapper<V> valMapper) {
- return new KVViewImpl<>(tbl, keyMapper, valMapper);
+ return new KVViewImpl<>(tbl, schemaMgr, keyMapper, valMapper);
}
/** {@inheritDoc} */
@Override public KeyValueBinaryView kvView() {
- return new KeyValueBinaryViewImpl(tbl);
+ return new KVBinaryViewImpl(tbl, schemaMgr);
}
/** {@inheritDoc} */
@Override public Tuple get(Tuple keyRec) {
- Marshaller marsh = marshaller();
-
- return tbl.get(marsh.marshalRecord(keyRec));
+ return sync(getAsync(keyRec));
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> getAsync(Tuple keyRec) {
- return null;
+ @Override public @NotNull CompletableFuture<Tuple> getAsync(Tuple keyRec) {
+ Objects.requireNonNull(keyRec);
+
+ final Row keyRow = marshaller().marshal(keyRec, null); // Convert to portable format to pass TX/storage layer.
+
+ return tbl.get(keyRow).thenApply(this::wrap);
}
/** {@inheritDoc} */
@Override public Collection<Tuple> getAll(Collection<Tuple> keyRecs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Collection<Tuple>> getAllAsync(Collection<Tuple> keyRecs) {
- return null;
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(Collection<Tuple> keyRecs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public void upsert(Tuple rec) {
-
+ sync(upsertAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Void> upsertAsync(Tuple rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Void> upsertAsync(Tuple rec) {
+ Objects.requireNonNull(rec);
+
+ final Row keyRow = marshaller().marshal(rec);
+
+ return tbl.upsert(keyRow);
}
/** {@inheritDoc} */
@Override public void upsertAll(Collection<Tuple> recs) {
-
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Void> upsertAllAsync(Collection<Tuple> recs) {
- return null;
+ @Override public @NotNull CompletableFuture<Void> upsertAllAsync(Collection<Tuple> recs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public Tuple getAndUpsert(Tuple rec) {
- return null;
+ return sync(getAndUpsertAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> getAndUpsertAsync(Tuple rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(Tuple rec) {
+ Objects.requireNonNull(rec);
+
+ final Row keyRow = marshaller().marshal(rec);
+
+ return tbl.getAndUpsert(keyRow).thenApply(this::wrap);
}
/** {@inheritDoc} */
@Override public boolean insert(Tuple rec) {
- return false;
+ return sync(insertAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> insertAsync(Tuple rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> insertAsync(Tuple rec) {
+ Objects.requireNonNull(rec);
+
+ final Row keyRow = marshaller().marshal(rec);
+
+ return tbl.insert(keyRow);
}
/** {@inheritDoc} */
@Override public Collection<Tuple> insertAll(Collection<Tuple> recs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Collection<Tuple>> insertAllAsync(Collection<Tuple> recs) {
- return null;
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(Collection<Tuple> recs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean replace(Tuple rec) {
- return false;
+ return sync(replaceAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> replaceAsync(Tuple rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple rec) {
+ Objects.requireNonNull(rec);
+
+ final Row keyRow = marshaller().marshal(rec);
+
+ return tbl.replace(keyRow);
}
/** {@inheritDoc} */
@Override public boolean replace(Tuple oldRec, Tuple newRec) {
- return false;
+ return sync(replaceAsync(oldRec, newRec));
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> replaceAsync(Tuple oldRec, Tuple newRec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple oldRec, Tuple newRec) {
+ Objects.requireNonNull(oldRec);
+ Objects.requireNonNull(newRec);
+
+ final Row oldRow = marshaller().marshal(oldRec);
+ final Row newRow = marshaller().marshal(newRec);
+
+ return tbl.replace(oldRow, newRow);
}
/** {@inheritDoc} */
@Override public Tuple getAndReplace(Tuple rec) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> getAndReplaceAsync(Tuple rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(Tuple rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public boolean delete(Tuple keyRec) {
- return false;
+ return sync(deleteAsync(keyRec));
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> deleteAsync(Tuple keyRec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> deleteAsync(Tuple keyRec) {
+ Objects.requireNonNull(keyRec);
+
+ final Row keyRow = marshaller().marshal(keyRec, null);
+
+ return tbl.delete(keyRow);
}
/** {@inheritDoc} */
- @Override public boolean deleteExact(Tuple oldRec) {
- return false;
+ @Override public boolean deleteExact(Tuple rec) {
+ return sync(deleteExactAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Boolean> deleteExactAsync(Tuple oldRec) {
- return null;
+ @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(Tuple rec) {
+ Objects.requireNonNull(rec);
+
+ final Row row = marshaller().marshal(rec);
+
+ return tbl.deleteExact(row);
}
/** {@inheritDoc} */
@Override public Tuple getAndDelete(Tuple rec) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Tuple> getAndDeleteAsync(Tuple rec) {
- return null;
+ @Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(Tuple rec) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public Collection<Tuple> deleteAll(Collection<Tuple> recs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Collection<Tuple>> deleteAllAsync(Collection<Tuple> recs) {
- return null;
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(Collection<Tuple> recs) {
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public Collection<Tuple> deleteAllExact(Collection<Tuple> recs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull IgniteFuture<Collection<Tuple>> deleteAllExactAsync(
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllExactAsync(
Collection<Tuple> recs) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@@ -224,15 +261,15 @@ public class TableImpl implements Table {
Tuple keyRec,
InvokeProcessor<Tuple, Tuple, T> proc
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull <T extends Serializable> IgniteFuture<T> invokeAsync(
+ @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(
Tuple keyRec,
InvokeProcessor<Tuple, Tuple, T> proc
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@@ -240,26 +277,39 @@ public class TableImpl implements Table {
Collection<Tuple> keyRecs,
InvokeProcessor<Tuple, Tuple, T> proc
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull <T extends Serializable> IgniteFuture<Map<Tuple, T>> invokeAllAsync(
+ @Override public @NotNull <T extends Serializable> CompletableFuture<Map<Tuple, T>> invokeAllAsync(
Collection<Tuple> keyRecs,
InvokeProcessor<Tuple, Tuple, T> proc
) {
- return null;
+ throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public TupleBuilder tupleBuilder() {
- return null;
+ return new TupleBuilderImpl();
}
/**
* @return Marshaller.
*/
- private Marshaller marshaller() {
- return null;
+ private TupleMarshaller marshaller() {
+ return marsh;
+ }
+
+ /**
+ * @param row Binary row.
+ * @return Table row.
+ */
+ private TableRow wrap(BinaryRow row) {
+ if (row == null)
+ return null;
+
+ final SchemaDescriptor schema = schemaMgr.schema(row.schemaVersion());
+
+ return new TableRow(schema, new Row(schema, row));
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
index e00ca7b..b9955fc 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRow.java
@@ -17,22 +17,107 @@
package org.apache.ignite.internal.table;
+import java.util.Objects;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
/**
- * Table binary row interface.
+ * Row to Tuple adapter.
+ *
+ * Provides methods to access columns values by column names.
*/
-public interface TableRow extends RowChunk {
+public class TableRow extends RowChunkAdapter {
+ /** Schema. */
+ private final SchemaDescriptor schema;
+
+ /** Row. */
+ private final Row row;
+
/**
- * @return Key chunk.
+ * Constructor.
+ *
+ * @param schema Schema descriptor.
+ * @param row Row.
*/
- RowChunk keyChunk();
+ public TableRow(SchemaDescriptor schema, Row row) {
+ assert schema.version() == row.schemaVersion();
+
+ this.schema = schema;
+ this.row = row;
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull protected final Column columnByName(@NotNull String colName) {
+ Objects.requireNonNull(colName);
+
+ final Column col = schema.column(colName);
+
+ if (col == null)
+ throw new IllegalArgumentException("Invalid column name: columnName=" + colName + ", schemaVersion=" + schema.version());
+
+ return col;
+ }
/**
- * @return Value chunk.
+ * @return Key chunk.
*/
- RowChunk valueChunk();
+ public Tuple keyChunk() {
+ return new KeyRowChunk();
+ }
/**
- * @return Row schema version.
+ * @return Value chunk.
*/
- long schemaVersion();
+ public @Nullable Tuple valueChunk() {
+ return row.hasValue() ? new ValueRowChunk() : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Row row() {
+ return row;
+ }
+
+ /** Key column chunk. */
+ private class KeyRowChunk extends RowChunkAdapter {
+ /** {@inheritDoc} */
+ @Override protected Row row() {
+ return row;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected @NotNull Column columnByName(@NotNull String colName) {
+ Objects.requireNonNull(colName);
+
+ final Column col = schema.column(colName);
+
+ if (col == null || !schema.isKeyColumn(col.schemaIndex()))
+ throw new IllegalArgumentException("Invalid key column name: columnName=" + colName + ", schemaVersion=" + schema.version());
+
+ return col;
+ }
+ }
+
+ /** Value column chunk. */
+ private class ValueRowChunk extends RowChunkAdapter {
+ /** {@inheritDoc} */
+ @Override protected Row row() {
+ return row;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected @NotNull Column columnByName(@NotNull String colName) {
+ Objects.requireNonNull(colName);
+
+ final Column col = schema.column(colName);
+
+ if (col == null || schema.isKeyColumn(col.schemaIndex()))
+ throw new IllegalArgumentException("Invalid key column name: columnName=" + colName + ", schemaVersion=" + schema.version());
+
+ return col;
+ }
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRowAdapter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableRowAdapter.java
deleted file mode 100644
index 75eeede..0000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableRowAdapter.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table;
-
-import java.util.Objects;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.jetbrains.annotations.NotNull;
-
-/**
- * Table row adapter for Row.
- */
-public class TableRowAdapter extends RowChunkAdapter implements TableRow {
- /** Schema. */
- private final SchemaDescriptor schema;
-
- /** Row. */
- private final Row row;
-
- /** Key chunk projection. */
- private final RowChunk keyChunk;
-
- /** Value chunk projection. */
- private final RowChunk valChunk;
-
- /**
- * Constructor.
- *
- * @param row Row.
- * @param schema Schema descriptor.
- */
- public TableRowAdapter(Row row, SchemaDescriptor schema) {
- this.schema = schema;
- this.row = row;
-
- keyChunk = new KeyRowChunk();
- valChunk = row.hasValue() ? new ValueRowChunk() : null;
- }
-
- /** {@inheritDoc} */
- @Override public long schemaVersion() {
- return schema.version();
- }
-
- /** {@inheritDoc} */
- @Override @NotNull protected final Column columnByName(@NotNull String colName) {
- Objects.requireNonNull(colName);
-
- final Column col = schema.column(colName);
-
- if (col == null)
- throw new IllegalArgumentException("Invalid column name: columnName=" + colName + ", schemaVersion=" + schema.version());
-
- return col;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] toBytes() {
- return row.rowBytes();
- }
-
- /** {@inheritDoc} */
- @Override public RowChunk keyChunk() {
- return keyChunk;
- }
-
- /** {@inheritDoc} */
- @Override public RowChunk valueChunk() {
- return valChunk;
- }
-
- /** {@inheritDoc} */
- @Override protected Row row() {
- return row;
- }
-
- /** Key column chunk. */
- private class KeyRowChunk extends RowChunkAdapter {
- /** {@inheritDoc} */
- @Override protected Row row() {
- return row;
- }
-
- /** {@inheritDoc} */
- @Override protected @NotNull Column columnByName(@NotNull String colName) {
- Objects.requireNonNull(colName);
-
- final Column col = schema.column(colName);
-
- if (col == null || !schema.keyColumn(col.schemaIndex()))
- throw new IllegalArgumentException("Invalid key column name: columnName=" + colName + ", schemaVersion=" + schema.version());
-
- return col;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] toBytes() {
- return row.keyChunkBytes();
- }
- }
-
- /** Value column chunk. */
- private class ValueRowChunk extends RowChunkAdapter {
- /** {@inheritDoc} */
- @Override protected Row row() {
- return row;
- }
-
- /** {@inheritDoc} */
- @Override protected @NotNull Column columnByName(@NotNull String colName) {
- Objects.requireNonNull(colName);
-
- final Column col = schema.column(colName);
-
- if (col == null || schema.keyColumn(col.schemaIndex()))
- throw new IllegalArgumentException("Invalid key column name: columnName=" + colName + ", schemaVersion=" + schema.version());
-
- return col;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] toBytes() {
- return row.valueChunkBytes();
- }
- }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunk.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaManager.java
similarity index 73%
rename from modules/table/src/main/java/org/apache/ignite/internal/table/RowChunk.java
rename to modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaManager.java
index 43ba794..09961ab 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunk.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableSchemaManager.java
@@ -17,14 +17,20 @@
package org.apache.ignite.internal.table;
-import org.apache.ignite.table.Tuple;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
/**
- * Table row chunk.
+ * Table schema manager interface.
*/
-public interface RowChunk extends Tuple {
+public interface TableSchemaManager {
/**
- * @return Row chunk bytes.
+ * @return Current schema.
*/
- public byte[] toBytes();
+ SchemaDescriptor schema();
+
+ /**
+ * @param ver Schema version.
+ * @return Schema of given version.
+ */
+ SchemaDescriptor schema(int ver);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
similarity index 56%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
index e0ff1a7..4e0e850 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RowChunkAdapter.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleBuilderImpl.java
@@ -17,90 +17,82 @@
package org.apache.ignite.internal.table;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjects;
-import org.apache.ignite.internal.schema.Column;
-import org.apache.ignite.internal.schema.Row;
-import org.jetbrains.annotations.NotNull;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.table.TupleBuilder;
/**
- * Row to RowChunk adapter.
+ * Buildable tuple.
*/
-public abstract class RowChunkAdapter implements RowChunk {
- /**
- * @param colName Column name.
- * @return Column.
- */
- @NotNull protected abstract Column columnByName(@NotNull String colName);
+public class TupleBuilderImpl implements TupleBuilder, Tuple {
+ /** Columns values. */
+ private final Map<String, Object> map;
/**
- * @return Underlying row.
+ * Constructor.
*/
- protected abstract Row row();
+ public TupleBuilderImpl() {
+ map = new HashMap<>();
+ }
/** {@inheritDoc} */
- @Override public abstract byte[] toBytes();
+ @Override public TupleBuilder set(String colName, Object value) {
+ map.put(colName, value);
+
+ return this;
+ }
/** {@inheritDoc} */
- @Override public <T> T value(String colName) {
- final Column col = columnByName(colName);
+ @Override public Tuple build() {
+ return this;
+ }
- return (T)col.type().spec().objectValue(row(), col.schemaIndex());
+ @Override public <T> T value(String colName) {
+ return (T)map.get(colName);
}
/** {@inheritDoc} */
@Override public BinaryObject binaryObjectField(String colName) {
- Column col = columnByName(colName);
+ byte[] data = value(colName);
- return BinaryObjects.wrap(row().bytesValue(col.schemaIndex()));
+ return BinaryObjects.wrap(data);
}
/** {@inheritDoc} */
@Override public byte byteValue(String colName) {
- Column col = columnByName(colName);
-
- return row().byteValue(col.schemaIndex());
+ return value(colName);
}
/** {@inheritDoc} */
@Override public short shortValue(String colName) {
- Column col = columnByName(colName);
-
- return row().shortValue(col.schemaIndex());
+ return value(colName);
}
/** {@inheritDoc} */
@Override public int intValue(String colName) {
- Column col = columnByName(colName);
-
- return row().intValue(col.schemaIndex());
+ return value(colName);
}
/** {@inheritDoc} */
@Override public long longValue(String colName) {
- Column col = columnByName(colName);
-
- return row().longValue(col.schemaIndex());
+ return value(colName);
}
/** {@inheritDoc} */
@Override public float floatValue(String colName) {
- Column col = columnByName(colName);
-
- return row().floatValue(col.schemaIndex());
+ return value(colName);
}
/** {@inheritDoc} */
@Override public double doubleValue(String colName) {
- Column col = columnByName(colName);
-
- return row().doubleValue(col.schemaIndex());
+ return value(colName);
}
/** {@inheritDoc} */
@Override public String stringValue(String colName) {
- Column col = columnByName(colName);
-
- return row().stringValue(col.schemaIndex());
+ return value(colName);
}
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
new file mode 100644
index 0000000..5b8a217
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TupleMarshallerImpl.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshaller;
+import org.apache.ignite.table.Tuple;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Marshaller implementation.
+ */
+class TupleMarshallerImpl implements TupleMarshaller {
+ /** Schema manager. */
+ private final TableSchemaManager schemaMgr;
+
+ /**
+ * Constructor.
+ *
+ * @param schemaMgr Schema manager.
+ */
+ TupleMarshallerImpl(TableSchemaManager schemaMgr) {
+ this.schemaMgr = schemaMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row marshal(@NotNull Tuple tuple) {
+ return marshal(tuple, tuple);
+ }
+
+ /** {@inheritDoc} */
+ @Override public Row marshal(Tuple keyTuple, Tuple valTuple) {
+ final SchemaDescriptor schema = schemaMgr.schema();
+
+ assert keyTuple instanceof TupleBuilderImpl;
+
+ final RowAssembler rowBuilder = new RowAssembler(schema, 4096, 0, 0);
+
+ for (int i = 0; i < schema.keyColumns().length(); i++) {
+ final Column col = schema.keyColumns().column(i);
+
+ writeColumn(keyTuple, col, rowBuilder);
+ }
+
+ if (valTuple != null) {
+ for (int i = 0; i < schema.valueColumns().length(); i++) {
+ final Column col = schema.valueColumns().column(i);
+
+ writeColumn(valTuple, col, rowBuilder);
+ }
+ }
+
+ return new Row(schema, new ByteBufferRow(rowBuilder.build()));
+ }
+
+ /**
+ * @param tup Tuple.
+ * @param col Column.
+ * @param rowAsm Row assembler.
+ */
+ private void writeColumn(Tuple tup, Column col, RowAssembler rowAsm) {
+ if (tup.value(col.name()) == null) {
+ rowAsm.appendNull();
+ return;
+ }
+
+ switch (col.type().spec()) {
+ case LONG: {
+ rowAsm.appendLong(tup.longValue(col.name()));
+
+ break;
+ }
+
+ default:
+ throw new IllegalStateException("Unexpected value: " + col.type());
+ }
+ }
+
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/Example.java b/modules/table/src/test/java/org/apache/ignite/table/Example.java
index 7fa195f..242a185 100644
--- a/modules/table/src/test/java/org/apache/ignite/table/Example.java
+++ b/modules/table/src/test/java/org/apache/ignite/table/Example.java
@@ -20,10 +20,10 @@ package org.apache.ignite.table;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
+import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.binary.BinaryObjects;
import org.apache.ignite.internal.table.TableImpl;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.table.impl.TestTableStorageImpl;
+import org.apache.ignite.table.impl.DummyInternalTableImpl;
import org.apache.ignite.table.mapper.Mappers;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
@@ -32,14 +32,15 @@ import org.junit.jupiter.params.provider.MethodSource;
/**
*
*/
-@SuppressWarnings({"PMD.EmptyLineSeparatorCheck", "emptylineseparator",
+@SuppressWarnings({
+ "PMD.EmptyLineSeparatorCheck", "emptylineseparator",
"unused", "UnusedAssignment", "InstanceVariableMayNotBeInitialized", "JoinDeclarationAndAssignmentJava"})
public class Example {
/**
* @return Table implementation.
*/
private static List<Table> tableFactory() {
- return Collections.singletonList(new TableImpl(new TestTableStorageImpl()));
+ return Collections.singletonList(new TableImpl(new DummyInternalTableImpl(), null));
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/table/KVViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/table/KVViewOperationsTest.java
new file mode 100644
index 0000000..33f1650
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/table/KVViewOperationsTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.KVBinaryViewImpl;
+import org.apache.ignite.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.table.impl.DummySchemaManagerImpl;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Basic table operations test.
+ * <p>
+ * TODO: IGNITE-14487 Add bulk operations tests.
+ * TODO: IGNITE-14487 Add async operations tests.
+ * TODO: IGNITE-14487 Check non-key fields in Tuple is ignored for keys.
+ * TODO: IGNITE-14487 Check key fields in Tuple is ignored for value or exception is thrown?
+ */
+public class KVViewOperationsTest {
+ /**
+ *
+ */
+ @Test
+ public void testPut() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ KeyValueBinaryView tbl = new KVBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple key = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple val = tbl.tupleBuilder().set("val", 11L).build();
+ final Tuple val2 = tbl.tupleBuilder().set("val", 22L).build();
+ final Tuple val3 = tbl.tupleBuilder().set("val", 33L).build();
+
+ assertNull(tbl.get(key));
+
+ // Put KV pair.
+ tbl.put(key, val);
+
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertEqualsValues(schema, val, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1L).build()));
+
+ // Update KV pair.
+ tbl.put(key, val2);
+
+ assertEqualsValues(schema, val2, tbl.get(key));
+ assertEqualsValues(schema, val2, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1L).build()));
+
+ // Remove KV pair.
+ tbl.put(key, null);
+
+ assertNull(tbl.get(key));
+
+ // Put KV pair.
+ tbl.put(key, val3);
+ assertEqualsValues(schema, val3, tbl.get(key));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testPutIfAbsent() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ KeyValueBinaryView tbl = new KVBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple key = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple val = tbl.tupleBuilder().set("val", 11L).build();
+ final Tuple val2 = tbl.tupleBuilder().set("val", 22L).build();
+
+ assertNull(tbl.get(key));
+
+ // Insert new KV pair.
+ assertTrue(tbl.putIfAbsent(key, val));
+
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertEqualsValues(schema, val, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1L).build()));
+
+ // Update KV pair.
+ assertFalse(tbl.putIfAbsent(key, val2));
+
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertEqualsValues(schema, val, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1L).build()));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testGetAndPut() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ KeyValueBinaryView tbl = new KVBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple key = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple val = tbl.tupleBuilder().set("val", 11L).build();
+ final Tuple val2 = tbl.tupleBuilder().set("val", 22L).build();
+ final Tuple val3 = tbl.tupleBuilder().set("val", 33L).build();
+
+ assertNull(tbl.get(key));
+
+ // Insert new tuple.
+ assertNull(tbl.getAndPut(key, val));
+
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertEqualsValues(schema, val, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1L).build()));
+
+ // Check non-value fields is ignored.
+ assertEqualsValues(schema, val, tbl.getAndPut(key, val2));
+ assertEqualsValues(schema, val2, tbl.getAndPut(key, tbl.tupleBuilder().set("id", 2L).set("val", 33L).build()));
+
+ assertEqualsValues(schema, val3, tbl.get(key));
+ assertNull(tbl.get(tbl.tupleBuilder().set("id", 2L).build()));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testRemove() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ KeyValueBinaryView tbl = new KVBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple key = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple key2 = tbl.tupleBuilder().set("id", 2L).build();
+ final Tuple val = tbl.tupleBuilder().set("val", 11L).build();
+ final Tuple val2 = tbl.tupleBuilder().set("val", 22L).build();
+
+ // Put KV pair.
+ tbl.put(key, val);
+
+ // Delete existed key.
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertTrue(tbl.remove(key));
+ assertNull(tbl.get(key));
+
+ // Delete already deleted key.
+ assertFalse(tbl.remove(key));
+
+ // Put KV pair.
+ tbl.put(key, val2);
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Delete existed key.
+ assertTrue(tbl.remove(tbl.tupleBuilder().set("id", 1L).set("val", -1L).build()));
+ assertNull(tbl.get(key));
+
+ // Delete not existed key.
+ assertNull(tbl.get(key2));
+ assertFalse(tbl.remove(key2));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testRemoveExact() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ KeyValueBinaryView tbl = new KVBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple key = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple key2 = tbl.tupleBuilder().set("id", 2L).build();
+ final Tuple val = tbl.tupleBuilder().set("val", 11L).build();
+ final Tuple val2 = tbl.tupleBuilder().set("val", 22L).build();
+
+ // Put KV pair.
+ tbl.put(key, val);
+ assertEqualsValues(schema, val, tbl.get(key));
+
+ // Fails to delete KV pair with unexpected value.
+ assertFalse(tbl.remove(key, val2));
+ assertEqualsValues(schema, val, tbl.get(key));
+
+ // Delete KV pair with expected value.
+ assertTrue(tbl.remove(key, val));
+ assertNull(tbl.get(key));
+
+ // Once again.
+ assertFalse(tbl.remove(key, val));
+ assertNull(tbl.get(key));
+
+ // Try to remove non-existed key.
+ assertThrows(NullPointerException.class, () -> tbl.remove(key, null));
+ assertNull(tbl.get(key));
+
+ // Put KV pair.
+ tbl.put(key, val2);
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Check null value ignored.
+ assertThrows(NullPointerException.class, () -> tbl.remove(key, null));
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Delete KV pair with expected value.
+ assertTrue(tbl.remove(key, val2));
+ assertNull(tbl.get(key));
+
+ assertThrows(NullPointerException.class, () -> tbl.remove(key2, null));
+
+ assertFalse(tbl.remove(key2, val2));
+ assertNull(tbl.get(key2));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testReplace() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ KeyValueBinaryView tbl = new KVBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple key = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple key2 = tbl.tupleBuilder().set("id", 2L).build();
+ final Tuple val = tbl.tupleBuilder().set("val", 11L).build();
+ final Tuple val2 = tbl.tupleBuilder().set("val", 22L).build();
+ final Tuple val3 = tbl.tupleBuilder().set("val", 33L).build();
+
+ // Ignore replace operation for non-existed KV pair.
+ assertFalse(tbl.replace(key, val));
+ assertNull(tbl.get(key));
+
+ tbl.put(key, val);
+
+ // Replace existed KV pair.
+ assertTrue(tbl.replace(key, val2));
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Remove existed KV pair.
+ assertTrue(tbl.replace(key, null));
+ assertNull(tbl.get(key));
+
+ // Ignore replace operation for non-existed KV pair.
+ assertFalse(tbl.replace(key, val3));
+ assertNull(tbl.get(key));
+
+ tbl.put(key, val3);
+ assertEqualsValues(schema, val3, tbl.get(key));
+
+ // Remove non-existed KV pair.
+ assertFalse(tbl.replace(key2, null));
+ assertNull(tbl.get(key2));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testReplaceExact() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ KeyValueBinaryView tbl = new KVBinaryViewImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple key = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple key2 = tbl.tupleBuilder().set("id", 2L).build();
+ final Tuple val = tbl.tupleBuilder().set("val", 11L).build();
+ final Tuple val2 = tbl.tupleBuilder().set("val", 22L).build();
+ final Tuple val3 = tbl.tupleBuilder().set("val", 33L).build();
+
+ // Insert KV pair.
+ assertTrue(tbl.replace(key, null, val));
+ assertEqualsValues(schema, val, tbl.get(key));
+ assertNull(tbl.get(key2));
+
+ // Ignore replace operation for non-existed KV pair.
+ assertFalse(tbl.replace(key2, val, val2));
+ assertNull(tbl.get(key2));
+
+ // Replace existed KV pair.
+ assertTrue(tbl.replace(key, val, val2));
+ assertEqualsValues(schema, val2, tbl.get(key));
+
+ // Remove existed KV pair.
+ assertTrue(tbl.replace(key, val2, null));
+ assertNull(tbl.get(key));
+
+ // Insert KV pair.
+ assertTrue(tbl.replace(key, null, val3));
+ assertEqualsValues(schema, val3, tbl.get(key));
+
+ // Remove non-existed KV pair.
+ assertTrue(tbl.replace(key2, null, null));
+ }
+
+ /**
+ * Check key columns equality.
+ *
+ * @param schema Schema.
+ * @param expected Expected tuple.
+ * @param actual Actual tuple.
+ */
+ void assertEqualsKeys(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+ int nonNullKey = 0;
+
+ for (int i = 0; i < schema.keyColumns().length(); i++) {
+ final Column col = schema.keyColumns().column(i);
+
+ final Object val1 = expected.value(col.name());
+ final Object val2 = actual.value(col.name());
+
+ Assertions.assertEquals(val1, val2, "Value columns equality check failed: colIdx=" + col.schemaIndex());
+
+ if (schema.isKeyColumn(i) && val1 != null)
+ nonNullKey++;
+ }
+
+ assertTrue(nonNullKey > 0, "At least one non-null key column must exist.");
+ }
+
+ /**
+ * Check value columns equality.
+ *
+ * @param schema Schema.
+ * @param expected Expected tuple.
+ * @param actual Actual tuple.
+ */
+ void assertEqualsValues(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+ for (int i = 0; i < schema.valueColumns().length(); i++) {
+ final Column col = schema.valueColumns().column(i);
+
+ final Object val1 = expected.value(col.name());
+ final Object val2 = actual.value(col.name());
+
+ Assertions.assertEquals(val1, val2, "Key columns equality check failed: colIdx=" + col.schemaIndex());
+ }
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/TableBinaryViewOperationsTest.java b/modules/table/src/test/java/org/apache/ignite/table/TableBinaryViewOperationsTest.java
new file mode 100644
index 0000000..6b011b0
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/table/TableBinaryViewOperationsTest.java
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table;
+
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeType;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.table.impl.DummyInternalTableImpl;
+import org.apache.ignite.table.impl.DummySchemaManagerImpl;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Basic table operations test.
+ * <p>
+ * TODO: IGNITE-14486 Add tests for invoke operations.
+ * TODO: IGNITE-14486 Add tests for bulk operations.
+ * TODO: IGNITE-14486 Add tests for async operations.
+ */
+public class TableBinaryViewOperationsTest {
+ /**
+ *
+ */
+ @Test
+ public void testInsert() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple tuple = tbl.tupleBuilder().set("id", 1L).set("val", 11L).build();
+ final Tuple newTuple = tbl.tupleBuilder().set("id", 1L).set("val", 22L).build();
+ final Tuple nonExistedTuple = tbl.tupleBuilder().set("id", 2L).build();
+
+ assertNull(tbl.get(tuple));
+
+ // Insert new tuple.
+ assertTrue(tbl.insert(tuple));
+
+ assertEqualsRows(schema, tuple, tbl.get(tuple));
+ assertEqualsRows(schema, tuple, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+
+ // Ignore insert operation for exited row.
+ assertFalse(tbl.insert(newTuple));
+
+ assertEqualsRows(schema, tuple, tbl.get(newTuple));
+ assertEqualsRows(schema, tuple, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+
+ assertNull(tbl.get(nonExistedTuple));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testUpsert() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple tuple = tbl.tupleBuilder().set("id", 1L).set("val", 11L).build();
+ final Tuple newTuple = tbl.tupleBuilder().set("id", 1L).set("val", 22L).build();
+ final Tuple nonExistedTuple = tbl.tupleBuilder().set("id", 2L).build();
+
+ assertNull(tbl.get(tbl.tupleBuilder().set("id", 1L).build()));
+ assertNull(tbl.get(tuple));
+
+ // Insert new tuple.
+ tbl.upsert(tuple);
+
+ assertEqualsRows(schema, tuple, tbl.get(tuple));
+ assertEqualsRows(schema, tuple, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+
+ // Update exited row.
+ tbl.upsert(newTuple);
+
+ assertEqualsRows(schema, newTuple, tbl.get(tuple));
+ assertEqualsRows(schema, newTuple, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+
+ assertNull(tbl.get(nonExistedTuple));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testGetAndUpsert() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple tuple = tbl.tupleBuilder().set("id", 1L).set("val", 11L).build();
+ final Tuple newTuple = tbl.tupleBuilder().set("id", 1L).set("val", 22L).build();
+
+ assertNull(tbl.get(tbl.tupleBuilder().set("id", 1L).build()));
+ assertNull(tbl.get(tuple));
+
+ // Insert new tuple.
+ assertNull(tbl.getAndUpsert(tuple));
+
+ assertEqualsRows(schema, tuple, tbl.get(tuple));
+ assertEqualsRows(schema, tuple, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+
+ // Update exited row.
+ assertEqualsRows(schema, tuple, tbl.getAndUpsert(newTuple));
+
+ assertEqualsRows(schema, newTuple, tbl.get(tuple));
+ assertEqualsRows(schema, newTuple, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testRemove() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple tuple = tbl.tupleBuilder().set("id", 1L).set("val", 11L).build();
+
+ tbl.upsert(tuple);
+
+ assertEqualsRows(schema, tuple, tbl.get(tuple));
+
+ // Delete not existed tuple.
+ assertEqualsRows(schema, tuple, tbl.get(tbl.tupleBuilder().set("id", 1L).build()));
+
+ // Delete existed tuple.
+ assertTrue(tbl.delete(tuple));
+ assertNull(tbl.get(tuple));
+
+ // Delete already deleted tuple.
+ assertFalse(tbl.delete(tuple));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testRemoveExact() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple keyTuple = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple tuple = tbl.tupleBuilder().set("id", 1L).set("val", 11L).build();
+ final Tuple tuple2 = tbl.tupleBuilder().set("id", 1L).set("val", 22L).build();
+ final Tuple nonExistedTuple = tbl.tupleBuilder().set("id", 2L).set("val", 22L).build();
+
+ tbl.insert(tuple);
+
+ assertEqualsRows(schema, tuple, tbl.get(keyTuple));
+
+ // Fails to delete not existed tuple.
+ assertFalse(tbl.deleteExact(nonExistedTuple));
+ assertEqualsRows(schema, tuple, tbl.get(keyTuple));
+
+ // Fails to delete tuple with unexpected value.
+ assertFalse(tbl.deleteExact(tuple2));
+ assertEqualsRows(schema, tuple, tbl.get(keyTuple));
+
+ // TODO: IGNITE-14479: Fix default value usage.
+// assertFalse(tbl.deleteExact(keyTuple));
+// assertEqualsRows(schema, tuple, tbl.get(keyTuple));
+
+ // Delete tuple with expected value.
+ assertTrue(tbl.deleteExact(tuple));
+ assertNull(tbl.get(keyTuple));
+
+ // Once again.
+ assertFalse(tbl.deleteExact(tuple));
+ assertNull(tbl.get(keyTuple));
+
+ // Insert new.
+ tbl.insert(tuple2);
+ assertEqualsRows(schema, tuple2, tbl.get(keyTuple));
+
+ // Delete tuple with expected value.
+ assertTrue(tbl.deleteExact(tuple2));
+ assertNull(tbl.get(keyTuple));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testReplace() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple keyTuple = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple tuple = tbl.tupleBuilder().set("id", 1L).set("val", 11L).build();
+ final Tuple tuple2 = tbl.tupleBuilder().set("id", 1L).set("val", 22L).build();
+
+ assertNull(tbl.get(keyTuple));
+
+ // Ignore replace operation for non-existed row.
+ assertFalse(tbl.replace(tuple));
+
+ assertNull(tbl.get(keyTuple));
+ assertNull(tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+
+ // Insert row.
+ tbl.insert(tuple);
+
+ // Replace existed row.
+ assertTrue(tbl.replace(tuple2));
+
+ assertEqualsRows(schema, tuple2, tbl.get(keyTuple));
+ assertEqualsRows(schema, tuple2, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testReplaceExact() {
+ SchemaDescriptor schema = new SchemaDescriptor(
+ 1,
+ new Column[] {new Column("id", NativeType.LONG, false)},
+ new Column[] {new Column("val", NativeType.LONG, false)}
+ );
+
+ Table tbl = new TableImpl(new DummyInternalTableImpl(), new DummySchemaManagerImpl(schema));
+
+ final Tuple keyTuple = tbl.tupleBuilder().set("id", 1L).build();
+ final Tuple tuple = tbl.tupleBuilder().set("id", 1L).set("val", 11L).build();
+ final Tuple tuple2 = tbl.tupleBuilder().set("id", 1L).set("val", 22L).build();
+
+ assertNull(tbl.get(keyTuple));
+
+ // Ignore replace operation for non-existed row.
+ // TODO: IGNITE-14479: Fix default value usage.
+// assertTrue(tbl.replace(keyTuple, tuple));
+
+// assertNull(tbl.get(keyTuple));
+// assertNull(tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+
+ // Insert row.
+ tbl.insert(tuple);
+
+ // Replace existed row.
+ assertTrue(tbl.replace(tuple, tuple2));
+
+ assertEqualsRows(schema, tuple2, tbl.get(keyTuple));
+ assertEqualsRows(schema, tuple2, tbl.get(tbl.tupleBuilder().set("id", 1L).set("val", -1).build()));
+ }
+
+ /**
+ * Check tuples equality.
+ *
+ * @param schema Schema.
+ * @param expected Expected tuple.
+ * @param actual Actual tuple.
+ */
+ void assertEqualsRows(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+ assertEqualsKeys(schema, expected, actual);
+ assertEqualsValues(schema, expected, actual);
+ }
+
+ /**
+ * Check key columns equality.
+ *
+ * @param schema Schema.
+ * @param expected Expected tuple.
+ * @param actual Actual tuple.
+ */
+ void assertEqualsKeys(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+ int nonNullKey = 0;
+
+ for (int i = 0; i < schema.keyColumns().length(); i++) {
+ final Column col = schema.keyColumns().column(i);
+
+ final Object val1 = expected.value(col.name());
+ final Object val2 = actual.value(col.name());
+
+ Assertions.assertEquals(val1, val2, "Value columns equality check failed: colIdx=" + col.schemaIndex());
+
+ if (schema.isKeyColumn(i) && val1 != null)
+ nonNullKey++;
+ }
+
+ assertTrue(nonNullKey > 0, "At least one non-null key column must exist.");
+ }
+
+ /**
+ * Check value columns equality.
+ *
+ * @param schema Schema.
+ * @param expected Expected tuple.
+ * @param actual Actual tuple.
+ */
+ void assertEqualsValues(SchemaDescriptor schema, Tuple expected, Tuple actual) {
+ for (int i = 0; i < schema.valueColumns().length(); i++) {
+ final Column col = schema.valueColumns().column(i);
+
+ final Object val1 = expected.value(col.name());
+ final Object val2 = actual.value(col.name());
+
+ Assertions.assertEquals(val1, val2, "Key columns equality check failed: colIdx=" + col.schemaIndex());
+ }
+ }
+}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java b/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
new file mode 100644
index 0000000..5e9a352
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/table/impl/DummyInternalTableImpl.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.table.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.InternalTable;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Dummy table storage implementation.
+ */
+public class DummyInternalTableImpl implements InternalTable {
+ /** In-memory dummy store. */
+ private final Map<KeyWrapper, BinaryRow> store = new ConcurrentHashMap<>();
+
+ /**
+ * Wrapper provides correct byte[] comparison.
+ */
+ private static class KeyWrapper {
+ /** Data. */
+ private final byte[] data;
+
+ /** Hash. */
+ private final int hash;
+
+ /**
+ * Constructor.
+ *
+ * @param data Wrapped data.
+ */
+ KeyWrapper(byte[] data, int hash) {
+ assert data != null;
+
+ this.data = data;
+ this.hash = hash;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (this == o)
+ return true;
+
+ if (o == null || getClass() != o.getClass())
+ return false;
+
+ KeyWrapper wrapper = (KeyWrapper)o;
+ return Arrays.equals(data, wrapper.data);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return hash;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<BinaryRow> get(@NotNull BinaryRow row) {
+ assert row != null;
+
+ return CompletableFuture.completedFuture(store.get(extractAndWrapKey(row)));
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<Void> upsert(@NotNull BinaryRow row) {
+ assert row != null;
+
+ store.put(extractAndWrapKey(row), row);
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public CompletableFuture<BinaryRow> getAndUpsert(@NotNull BinaryRow row) {
+ assert row != null;
+
+ final BinaryRow old = store.put(extractAndWrapKey(row), row);
+
+ return CompletableFuture.completedFuture(old);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> delete(BinaryRow row) {
+ assert row != null;
+
+ final KeyWrapper key = extractAndWrapKey(row);
+ final BinaryRow oldVal = store.get(key);
+
+ return CompletableFuture.completedFuture(oldVal != null && oldVal.hasValue() && store.remove(key, oldVal));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
+ assert keyRows != null && !keyRows.isEmpty();
+
+ final List<BinaryRow> res = keyRows.stream()
+ .map(this::extractAndWrapKey)
+ .map(store::get)
+ .collect(Collectors.toList());
+
+ return CompletableFuture.completedFuture(res);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows) {
+ assert rows != null && !rows.isEmpty();
+
+ rows.stream()
+ .map(k -> store.put(extractAndWrapKey(k), k));
+
+ return CompletableFuture.completedFuture(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> insert(BinaryRow row) {
+ assert row != null;
+
+ return CompletableFuture.completedFuture(store.putIfAbsent(extractAndWrapKey(row), row) == null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows) {
+ assert rows != null && !rows.isEmpty();
+
+ final List<BinaryRow> res = rows.stream()
+ .map(k -> store.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ return CompletableFuture.completedFuture(res);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> replace(BinaryRow row) {
+ assert row != null;
+
+ final KeyWrapper key = extractAndWrapKey(row);
+ final BinaryRow oldRow = store.get(key);
+
+ if (oldRow == null || !oldRow.hasValue())
+ return CompletableFuture.completedFuture(false);
+
+ return CompletableFuture.completedFuture(store.put(key, row) == oldRow);
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> replace(BinaryRow oldRow, BinaryRow newRow) {
+ assert oldRow != null;
+ assert newRow != null;
+
+ final KeyWrapper key = extractAndWrapKey(oldRow);
+ final BinaryRow row = store.get(key);
+
+ if (row == null)
+ return CompletableFuture.completedFuture(!oldRow.hasValue() && store.put(key, newRow) == null);
+
+ return CompletableFuture.completedFuture(equalValues(row, oldRow) && store.put(key, newRow) != null);
+ }
+
+ @Override public @NotNull CompletableFuture<BinaryRow> getAndReplace(BinaryRow row) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @NotNull CompletableFuture<Boolean> deleteExact(BinaryRow row) {
+ assert row != null;
+ assert row.hasValue();
+
+ final KeyWrapper key = extractAndWrapKey(row);
+ final BinaryRow old = store.get(key);
+
+ if (old == null || !old.hasValue())
+ return CompletableFuture.completedFuture(false);
+
+ return CompletableFuture.completedFuture(equalValues(row, old) && store.remove(key) != null);
+ }
+
+ @Override public @NotNull CompletableFuture<BinaryRow> getAndDelete(BinaryRow row) {
+ return null;
+ }
+
+ @Override public @NotNull CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows) {
+ return null;
+ }
+
+ @Override public @NotNull CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows) {
+ return null;
+ }
+
+ /**
+ * @param row Row.
+ * @return Extracted key.
+ */
+ @NotNull private DummyInternalTableImpl.KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
+ final byte[] bytes = new byte[row.keySlice().capacity()];
+ row.keySlice().get(bytes);
+
+ return new KeyWrapper(bytes, row.hash());
+ }
+
+ /**
+ * @param row Row.
+ * @return Extracted key.
+ */
+ @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
+ if (row.hasValue() ^ row2.hasValue())
+ return false;
+
+ return row.valueSlice().compareTo(row2.valueSlice()) == 0;
+ }
+}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java b/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
similarity index 51%
copy from modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java
copy to modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
index f87f91a..01df2fe 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/marshaller/SerializationException.java
+++ b/modules/table/src/test/java/org/apache/ignite/table/impl/DummySchemaManagerImpl.java
@@ -15,28 +15,41 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.schema.marshaller;
+package org.apache.ignite.table.impl;
+
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.TableSchemaManager;
+import org.jetbrains.annotations.NotNull;
/**
- * Serialization exception.
+ * Dummy schema manager for tests.
*/
-public class SerializationException extends Exception {
+public class DummySchemaManagerImpl implements TableSchemaManager {
+ /** Schema. */
+ private final SchemaDescriptor schema;
+
/**
* Constructor.
*
- * @param cause Cause.
+ * @param schema Schema descriptor.
*/
- public SerializationException(Throwable cause) {
- super(cause);
+ public DummySchemaManagerImpl(@NotNull SchemaDescriptor schema) {
+ assert schema != null;
+
+ this.schema = schema;
}
- /**
- * Constructor.
- *
- * @param message Message.
- * @param cause Cause.
- */
- public SerializationException(String message, Throwable cause) {
- super(message, cause);
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor schema() {
+ return schema;
+ }
+
+ /** {@inheritDoc} */
+ @Override public SchemaDescriptor schema(int ver) {
+ assert ver >= 0;
+
+ assert schema.version() == ver;
+
+ return schema;
}
}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/TestTableRowImpl.java b/modules/table/src/test/java/org/apache/ignite/table/impl/TestTableRowImpl.java
deleted file mode 100644
index 5c64b29..0000000
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/TestTableRowImpl.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.table.impl;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.internal.schema.Row;
-import org.apache.ignite.internal.table.RowChunk;
-import org.apache.ignite.internal.table.TableRow;
-
-/**
- * Dummy implementation class.
- */
-public class TestTableRowImpl implements TableRow {
- /** Key offset in tuple. */
- private static final int KEY_OFFSET = Row.KEY_HASH_FIELD_OFFSET;
-
- /** Payload. */
- private final byte[] bytes;
-
- /**
- * Constructor.
- *
- * @param bytes Bytes to wrap.
- */
- public TestTableRowImpl(byte[] bytes) {
- this.bytes = bytes.clone();
- }
-
- /** {@inheritDoc} */
- @Override public byte[] toBytes() {
- return bytes.clone();
- }
-
- /** {@inheritDoc} */
- @Override public RowChunk keyChunk() {
- return new TestTableRowImpl(bytes) {
- @Override public byte[] toBytes() {
- ByteBuffer buf = ByteBuffer.wrap(bytes());
-
- int keyLen = buf.getInt(KEY_OFFSET);
-
- return buf.position(KEY_OFFSET).limit(keyLen).slice().array();
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public RowChunk valueChunk() {
- return new TestTableRowImpl(bytes) {
- @Override public byte[] toBytes() {
- ByteBuffer buf = ByteBuffer.wrap(bytes());
-
- int valOffset = KEY_OFFSET + buf.getInt(KEY_OFFSET);
-
- return buf.position(valOffset).slice().array();
- }
- };
- }
-
- /** {@inheritDoc} */
- @Override public <T> T value(String colName) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public BinaryObject binaryObjectField(String colName) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public byte byteValue(String colName) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public short shortValue(String colName) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public int intValue(String colName) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public long longValue(String colName) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public float floatValue(String colName) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public double doubleValue(String colName) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public String stringValue(String colName) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long schemaVersion() {
- return 0;
- }
-
- /**
- *
- */
- private byte[] bytes() {
- return bytes;
- }
-}
diff --git a/modules/table/src/test/java/org/apache/ignite/table/impl/TestTableStorageImpl.java b/modules/table/src/test/java/org/apache/ignite/table/impl/TestTableStorageImpl.java
deleted file mode 100644
index 7814787..0000000
--- a/modules/table/src/test/java/org/apache/ignite/table/impl/TestTableStorageImpl.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.table.impl;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.ignite.internal.storage.TableStorage;
-import org.apache.ignite.internal.table.TableRow;
-
-/**
- * Dummy table storage implementation.
- */
-public class TestTableStorageImpl implements TableStorage {
- /** In-memory dummy store. */
- private final Map<KeyChunk, TestTableRowImpl> store = new ConcurrentHashMap<>();
-
- /** {@inheritDoc} */
- @Override public TableRow get(TableRow obj) {
- return store.get(new KeyChunk(obj.keyChunk().toBytes()));
- }
-
- /** {@inheritDoc} */
- @Override public TableRow put(TableRow row) {
- return store.put(
- new KeyChunk(row.keyChunk().toBytes()),
- new TestTableRowImpl(row.toBytes()));
- }
-
- /**
- * Wrapper provides correct byte[] comparison.
- */
- private static class KeyChunk {
- /** Data. */
- private final byte[] data;
-
- /** Hash. */
- private final int hash;
-
- /**
- * Constructor.
- *
- * @param data Wrapped data.
- */
- KeyChunk(byte[] data) {
- this.data = data;
- this.hash = Arrays.hashCode(data);
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object o) {
- if (this == o)
- return true;
-
- if (o == null || getClass() != o.getClass())
- return false;
-
- KeyChunk wrapper = (KeyChunk)o;
- return Arrays.equals(data, wrapper.data);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return hash;
- }
- }
-}