You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sk...@apache.org on 2021/06/02 07:09:24 UTC
[ignite-3] branch main updated: IGNITE-14239 Raft based
implementation of atomic protocol. Fixes #118
This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4e912b5 IGNITE-14239 Raft based implementation of atomic protocol. Fixes #118
4e912b5 is described below
commit 4e912b58dcde8a46ff4da4769a26c7c301ab691e
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Wed Jun 2 10:00:25 2021 +0300
IGNITE-14239 Raft based implementation of atomic protocol. Fixes #118
Signed-off-by: Slava Koptilin <sl...@gmail.com>
---
.../java/org/apache/ignite/table/KeyValueView.java | 151 +++--
.../java/org/apache/ignite/table/TableView.java | 154 +++--
.../ignite/distributed/ITDistributedTableTest.java | 242 ++++++-
.../ignite/internal/table/KVBinaryViewImpl.java | 137 ++--
.../apache/ignite/internal/table/KVViewImpl.java | 62 +-
.../ignite/internal/table/RecordViewImpl.java | 72 +--
.../apache/ignite/internal/table/TableImpl.java | 168 +++--
.../table/distributed/command/CommandUtils.java | 158 +++++
.../distributed/command/DeleteAllCommand.java | 68 ++
.../table/distributed/command/DeleteCommand.java | 43 +-
.../distributed/command/DeleteExactAllCommand.java | 68 ++
...{InsertCommand.java => DeleteExactCommand.java} | 50 +-
.../table/distributed/command/GetAllCommand.java | 68 ++
...DeleteCommand.java => GetAndDeleteCommand.java} | 50 +-
...nsertCommand.java => GetAndReplaceCommand.java} | 49 +-
...DeleteCommand.java => GetAndUpsertCommand.java} | 52 +-
.../table/distributed/command/GetCommand.java | 43 +-
.../distributed/command/InsertAllCommand.java | 68 ++
.../table/distributed/command/InsertCommand.java | 43 +-
.../table/distributed/command/ReplaceCommand.java | 55 +-
...sertCommand.java => ReplaceIfExistCommand.java} | 49 +-
.../distributed/command/UpsertAllCommand.java | 68 ++
.../table/distributed/command/UpsertCommand.java | 43 +-
.../command/response/MultiRowsResponse.java | 71 +++
.../{KVGetResponse.java => SingleRowResponse.java} | 57 +-
.../table/distributed/raft/PartitionListener.java | 243 ++++++-
.../distributed/storage/InternalTableImpl.java | 148 ++++-
.../raft/PartitionCommandListenerTest.java | 700 +++++++++++++++++++++
28 files changed, 2426 insertions(+), 754 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
index f58fb65..ef0a80d 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/KeyValueView.java
@@ -36,176 +36,197 @@ public interface KeyValueView<K, V> {
/**
* Gets a value associated with the given key.
*
- * @param key The key whose associated value is to be returned.
+ * @param key A key which associated the value is to be returned.
+ * The key cannot be {@code null}.
* @return Value or {@code null}, if it does not exist.
*/
- V get(K key);
+ V get(@NotNull K key);
/**
* Asynchronously gets a value associated with the given key.
*
- * @param key The key whose associated value is to be returned.
+ * @param key A key which associated the value is to be returned.
+ * The key cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<V> getAsync(K key);
+ @NotNull CompletableFuture<V> getAsync(@NotNull K key);
/**
* Get values associated with given keys.
*
- * @param keys Keys whose associated values are to be returned.
+ * @param keys Keys which associated values are to be returned.
+ * The keys cannot be {@code null}.
* @return Values associated with given keys.
*/
- Map<K, V> getAll(Collection<K> keys);
+ Map<K, V> getAll(@NotNull Collection<K> keys);
/**
* Get values associated with given keys.
*
* @param keys Keys whose associated values are to be returned.
+ * The keys cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys);
+ @NotNull CompletableFuture<Map<K, V>> getAllAsync(@NotNull Collection<K> keys);
/**
* Determines if the table contains an entry for the specified key.
*
- * @param key The key whose presence is to be tested.
+ * @param key A key which presence is to be tested.
+ * The keys cannot be {@code null}.
* @return {@code True} if a value exists for the specified key, {@code false} otherwise.
*/
- boolean contains(K key);
+ boolean contains(@NotNull K key);
/**
* Puts value associated with given key into the table.
*
- * @param key Key with which the specified value is to be associated.
+ * @param key A key with which the specified value is to be associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
*/
- void put(K key, V val);
+ void put(@NotNull K key, V val);
/**
* Asynchronously puts value associated with given key into the table.
*
- * @param key Key with which the specified value is to be associated.
+ * @param key A key with which the specified value is to be associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Void> putAsync(K key, V val);
+ @NotNull CompletableFuture<Void> putAsync(@NotNull K key, V val);
/**
* Put associated key-value pairs.
*
* @param pairs Key-value pairs.
+ * The pairs cannot be {@code null}.
*/
- void putAll(Map<K, V> pairs);
+ void putAll(@NotNull Map<K, V> pairs);
/**
* Asynchronously put associated key-value pairs.
*
* @param pairs Key-value pairs.
+ * The pairs cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Void> putAllAsync(Map<K, V> pairs);
+ @NotNull CompletableFuture<Void> putAllAsync(@NotNull Map<K, V> pairs);
/**
* Puts new or replaces existed value associated with given key into the table.
*
- * @param key Key with which the specified value is to be associated.
+ * @param key A key with which the specified value is to be associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return Replaced value or {@code null}, if not existed.
*/
- V getAndPut(K key, V val);
+ V getAndPut(@NotNull K key, V val);
/**
* Asynchronously puts new or replaces existed value associated with given key into the table.
*
- * @param key Key with which the specified value is to be associated.
+ * @param key A key with which the specified value is to be associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<V> getAndPutAsync(K key, V val);
+ @NotNull CompletableFuture<V> getAndPutAsync(@NotNull K key, V val);
/**
* Puts value associated with given key into the table if not exists.
*
- * @param key Key with which the specified value is to be associated.
+ * @param key A key with which the specified value is to be associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return {@code True} if successful, {@code false} otherwise.
*/
- boolean putIfAbsent(K key, @NotNull V val);
+ boolean putIfAbsent(@NotNull K key, @NotNull V val);
/**
* Asynchronously puts value associated with given key into the table if not exists.
*
* @param key Key with which the specified value is to be associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> putIfAbsentAsync(K key, V val);
+ @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull K key, V val);
/**
* Removes value associated with given key from the table.
*
- * @param key Key whose mapping is to be removed from the table.
+ * @param key A key which mapping is to be removed from the table.
+ * The key cannot be {@code null}.
* @return {@code True} if a value associated with the specified key was successfully removed, {@code false} otherwise.
*/
- boolean remove(K key);
+ boolean remove(@NotNull K key);
/**
* Asynchronously removes value associated with given key from the table.
*
- * @param key Key whose mapping is to be removed from the table.
+ * @param key A key which mapping is to be removed from the table.
+ * The key cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> removeAsync(K key);
+ @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key);
/**
- * Removes expected value associated with given key from the table.
+ * Removes an expected value associated with the given key from the table.
*
- * @param key Key whose associated value is to be removed from the table.
- * @param val Expected value.
+ * @param key A key which associated value is to be removed from the table.
+ * The key cannot be {@code null}.
+ * @param val Expected value. The value cannot be {@code null}.
* @return {@code True} if the expected value for the specified key was successfully removed, {@code false} otherwise.
*/
- boolean remove(K key, @NotNull V val);
+ boolean remove(@NotNull K key, @NotNull V val);
/**
* Asynchronously removes expected value associated with given key from the table.
*
- * @param key Key whose associated value is to be removed from the table.
- * @param val Expected value.
+ * @param key A key which associated the value is to be removed from the table.
+ * The key cannot be {@code null}.
+ * @param val Expected value. The value cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> removeAsync(K key, V val);
+ @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key, @NotNull V val);
/**
* Remove values associated with given keys from the table.
*
- * @param keys Keys whose mapping is to be removed from the table.
+ * @param keys Keys which mapping is to be removed from the table.
+ * The keys cannot be {@code null}.
* @return Keys whose values were not existed.
*/
- Collection<K> removeAll(Collection<K> keys);
+ Collection<K> removeAll(@NotNull Collection<K> keys);
/**
* Asynchronously remove values associated with given keys from the table.
*
- * @param keys Keys whose mapping is to be removed from the table.
+ * @param keys Keys which mapping is to be removed from the table.
+ * The keys cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<K> removeAllAsync(Collection<K> keys);
+ @NotNull CompletableFuture<Collection<K>> removeAllAsync(@NotNull Collection<K> keys);
/**
* Gets then removes value associated with given key from the table.
*
- * @param key Key whose associated value is to be removed from the table.
+ * @param key A key which associated value is to be removed from the table.
+ * The key cannot be {@code null}.
* @return Removed value or {@code null}, if not existed.
*/
- V getAndRemove(K key);
+ V getAndRemove(@NotNull K key);
/**
* Asynchronously gets then removes value associated with given key from the table.
*
- * @param key Key whose mapping is to be removed from the table.
+ * @param key A Key which mapping is to be removed from the table.
+ * The key cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<V> getAndRemoveAsync(K key);
+ @NotNull CompletableFuture<V> getAndRemoveAsync(@NotNull K key);
/**
* Replaces the value for a key only if exists. This is equivalent to
@@ -218,21 +239,23 @@ public interface KeyValueView<K, V> {
* }</code></pre>
* except that the action is performed atomically.
*
- * @param key Key with which the specified value is associated.
+ * @param key A key with which the specified value is associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return {@code True} if an old value was replaced, {@code false} otherwise.
*/
- boolean replace(K key, V val);
+ boolean replace(@NotNull K key, V val);
/**
* Asynchronously replaces the value for a key only if exists.
* See {@link #replace(Object, Object)}.
*
- * @param key Key with which the specified value is associated.
+ * @param key A key with which the specified value is associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> replaceAsync(K key, V val);
+ @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V val);
/**
* Replaces the expected value for a key. This is equivalent to
@@ -245,23 +268,25 @@ public interface KeyValueView<K, V> {
* }</code></pre>
* except that the action is performed atomically.
*
- * @param key Key with which the specified value is associated.
+ * @param key A key with which the specified value is associated.
+ * The key cannot be {@code null}.
* @param oldVal Expected value associated with the specified key.
* @param newVal Value to be associated with the specified key.
* @return {@code True} if an old value was replaced, {@code false} otherwise.
*/
- boolean replace(K key, V oldVal, V newVal);
+ boolean replace(@NotNull K key, V oldVal, V newVal);
/**
* Asynchronously replaces the expected value for a key.
* See {@link #replace(Object, Object, Object)}
*
- * @param key Key with which the specified value is associated.
+ * @param key A key with which the specified value is associated.
+ * The key cannot be {@code null}.
* @param oldVal Expected value associated with the specified key.
* @param newVal Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> replaceAsync(K key, V oldVal, V newVal);
+ @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V oldVal, V newVal);
/**
* Replaces the value for a given key only if exists. This is equivalent to
@@ -276,45 +301,51 @@ public interface KeyValueView<K, V> {
* </code></pre>
* except that the action is performed atomically.
*
- * @param key Key with which the specified value is associated.
+ * @param key A key with which the specified value is associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return Replaced value, or {@code null} if not existed.
*/
- V getAndReplace(K key, V val);
+ V getAndReplace(@NotNull K key, V val);
/**
* Asynchronously replaces the value for a given key only if exists.
* See {@link #getAndReplace(Object, Object)}
*
- * @param key Key with which the specified value is associated.
+ * @param key A key with which the specified value is associated.
+ * The key cannot be {@code null}.
* @param val Value to be associated with the specified key.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<V> getAndReplaceAsync(K key, V val);
+ @NotNull CompletableFuture<V> getAndReplaceAsync(@NotNull K key, V val);
/**
* Executes invoke processor code against the value associated with the provided key.
*
- * @param key Key associated with the value that invoke processor will be applied to.
+ * @param key A key associated with the value that invoke processor will be applied to.
+ * The key cannot be {@code null}.
* @param proc Invoke processor.
* @param args Optional invoke processor arguments.
* @param <R> Invoke processor result type.
* @return Result of the processing.
* @see InvokeProcessor
*/
- <R extends Serializable> R invoke(K key, InvokeProcessor<K, V, R> proc, Serializable... args);
+ <R extends Serializable> R invoke(@NotNull K key, InvokeProcessor<K, V, R> proc, Serializable... args);
/**
* Asynchronously executes invoke processor code against the value associated with the provided key.
*
- * @param key Key associated with the value that invoke processor will be applied to.
+ * @param key A key associated with the value that invoke processor will be applied to.
+ * The key cannot be {@code null}.
* @param proc Invoke processor.
* @param args Optional invoke processor arguments.
* @param <R> Invoke processor result type.
* @return Future representing pending completion of the operation.
* @see InvokeProcessor
*/
- @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(K key, InvokeProcessor<K, V, R> proc,
+ @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
+ @NotNull K key,
+ InvokeProcessor<K, V, R> proc,
Serializable... args);
/**
@@ -322,13 +353,14 @@ public interface KeyValueView<K, V> {
*
* @param <R> Invoke processor result type.
* @param keys Ordered collection of keys which values associated with should be processed.
+ * The keys cannot be {@code null}.
* @param proc Invoke processor.
* @param args Optional invoke processor arguments.
* @return Results of the processing.
* @see InvokeProcessor
*/
<R extends Serializable> Map<K, R> invokeAll(
- Collection<K> keys,
+ @NotNull Collection<K> keys,
InvokeProcessor<K, V, R> proc,
Serializable... args);
@@ -337,13 +369,14 @@ public interface KeyValueView<K, V> {
*
* @param <R> Invoke processor result type.
* @param keys Ordered collection of keys which values associated with should be processed.
+ * The keys cannot be {@code null}.
* @param proc Invoke processor.
* @param args Optional invoke processor arguments.
* @return Future representing pending completion of the operation.
* @see InvokeProcessor
*/
@NotNull <R extends Serializable> CompletableFuture<Map<K, R>> invokeAllAsync(
- Collection<K> keys,
+ @NotNull Collection<K> keys,
InvokeProcessor<K, V, R> proc,
Serializable... args);
}
diff --git a/modules/api/src/main/java/org/apache/ignite/table/TableView.java b/modules/api/src/main/java/org/apache/ignite/table/TableView.java
index 02ac2a1..6ea9109 100644
--- a/modules/api/src/main/java/org/apache/ignite/table/TableView.java
+++ b/modules/api/src/main/java/org/apache/ignite/table/TableView.java
@@ -34,285 +34,321 @@ public interface TableView<R> {
/**
* Gets a record with same key columns values as given one from the table.
*
- * @param keyRec Record with key columns set.
- * @return Record with all columns filled from the table.
+ * @param keyRec A record with key columns set.
+ * The record cannot be {@code null}.
+ * @return A record with all columns filled from the table.
*/
- R get(R keyRec);
+ R get(@NotNull R keyRec);
/**
* Asynchronously gets a record with same key columns values as given one from the table.
*
- * @param keyRec Record with key columns set.
+ * @param keyRec A record with key columns set.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<R> getAsync(R keyRec);
+ @NotNull CompletableFuture<R> getAsync(@NotNull R keyRec);
/**
* Get records from the table.
*
* @param keyRecs Records with key columns set.
+ * The records cannot be {@code null}.
* @return Records with all columns filled from the table.
*/
- Collection<R> getAll(Collection<R> keyRecs);
+ Collection<R> getAll(@NotNull Collection<R> keyRecs);
/**
* Asynchronously get records from the table.
*
* @param keyRecs Records with key columns set.
+ * The records cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Collection<R>> getAllAsync(Collection<R> keyRecs);
+ @NotNull CompletableFuture<Collection<R>> getAllAsync(@NotNull Collection<R> keyRecs);
/**
* Inserts a record into the table if does not exist or replaces the existed one.
*
- * @param rec Record to insert into the table.
+ * @param rec A record to insert into the table.
+ * The record cannot be {@code null}.
*/
- void upsert(R rec);
+ void upsert(@NotNull R rec);
/**
* Asynchronously inserts a record into the table if does not exist or replaces the existed one.
*
- * @param rec Record to insert into the table.
+ * @param rec A record to insert into the table.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Void> upsertAsync(R rec);
+ @NotNull CompletableFuture<Void> upsertAsync(@NotNull R rec);
/**
* Insert records into the table if does not exist or replaces the existed one.
*
* @param recs Records to insert into the table.
+ * The records cannot be {@code null}.
*/
- void upsertAll(Collection<R> recs);
+ void upsertAll(@NotNull Collection<R> recs);
/**
* Asynchronously inserts a record into the table if does not exist or replaces the existed one.
*
* @param recs Records to insert into the table.
+ * The records cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Void> upsertAllAsync(Collection<R> recs);
+ @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<R> recs);
/**
* Inserts a record into the table or replaces if exists and return replaced previous record.
*
- * @param rec Record to insert into the table.
+ * @param rec A record to insert into the table.
+ * The record cannot be {@code null}.
* @return Replaced record or {@code null} if not existed.
*/
- R getAndUpsert(R rec);
+ R getAndUpsert(@NotNull R rec);
/**
* Asynchronously inserts a record into the table or replaces if exists and return replaced previous record.
*
- * @param rec Record to insert into the table.
+ * @param rec A record to insert into the table.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<R> getAndUpsertAsync(R rec);
+ @NotNull CompletableFuture<R> getAndUpsertAsync(@NotNull R rec);
/**
* Inserts a record into the table if not exists.
*
- * @param rec Record to insert into the table.
+ * @param rec A record to insert into the table.
+ * The record cannot be {@code null}.
* @return {@code True} if successful, {@code false} otherwise.
*/
- boolean insert(R rec);
+ boolean insert(@NotNull R rec);
/**
* Asynchronously inserts a record into the table if not exists.
*
- * @param rec Record to insert into the table.
+ * @param rec A record to insert into the table.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> insertAsync(R rec);
+ @NotNull CompletableFuture<Boolean> insertAsync(@NotNull R rec);
/**
* Insert records into the table which do not exist, skipping existed ones.
*
* @param recs Records to insert into the table.
+ * The records cannot be {@code null}.
* @return Skipped records.
*/
- Collection<R> insertAll(Collection<R> recs);
+ Collection<R> insertAll(@NotNull Collection<R> recs);
/**
* Asynchronously insert records into the table which do not exist, skipping existed ones.
*
* @param recs Records to insert into the table.
+ * The records cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Collection<R>> insertAllAsync(Collection<R> recs);
+ @NotNull CompletableFuture<Collection<R>> insertAllAsync(@NotNull Collection<R> recs);
/**
* Replaces an existed record associated with the same key columns values as the given one has.
*
- * @param rec Record to replace with.
+ * @param rec A record to replace with.
+ * The record cannot be {@code null}.
* @return {@code True} if old record was found and replaced successfully, {@code false} otherwise.
*/
- boolean replace(R rec);
+ boolean replace(@NotNull R rec);
/**
* Asynchronously replaces an existed record associated with the same key columns values as the given one has.
*
- * @param rec Record to replace with.
+ * @param rec A record to replace with.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> replaceAsync(R rec);
+ @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull R rec);
/**
* Replaces an expected record in the table with the given new one.
*
- * @param oldRec Record to replace.
- * @param newRec Record to replace with.
+ * @param oldRec A record to replace.
+ * The record cannot be {@code null}.
+ * @param newRec A record to replace with.
+ * The record cannot be {@code null}.
* @return {@code True} if the old record replaced successfully, {@code false} otherwise.
*/
- boolean replace(R oldRec, R newRec);
+ boolean replace(@NotNull R oldRec, @NotNull R newRec);
/**
* Asynchronously replaces an expected record in the table with the given new one.
*
- * @param oldRec Record to replace.
- * @param newRec Record to replace with.
+ * @param oldRec A record to replace.
+ * The record cannot be {@code null}.
+ * @param newRec A record to replace with.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> replaceAsync(R oldRec, R newRec);
+ @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull R oldRec, @NotNull R newRec);
/**
* Gets an existed record associated with the same key columns values as the given one has,
* then replaces with the given one.
*
- * @param rec Record to replace with.
+ * @param rec A record to replace with.
+ * The record cannot be {@code null}.
* @return Replaced record or {@code null} if not existed.
*/
- R getAndReplace(R rec);
+ R getAndReplace(@NotNull R rec);
/**
* Asynchronously gets an existed record associated with the same key columns values as the given one has,
* then replaces with the given one.
*
- * @param rec Record to replace with.
+ * @param rec A record to replace with.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<R> getAndReplaceAsync(R rec);
+ @NotNull CompletableFuture<R> getAndReplaceAsync(@NotNull R rec);
/**
* Deletes a record with the same key columns values as the given one from the table.
*
- * @param keyRec Record with key columns set.
+ * @param keyRec A record with key columns set.
+ * The record cannot be {@code null}.
* @return {@code True} if removed successfully, {@code false} otherwise.
*/
- boolean delete(R keyRec);
+ boolean delete(@NotNull R keyRec);
/**
* Asynchronously deletes a record with the same key columns values as the given one from the table.
*
- * @param keyRec Record with key columns set.
+ * @param keyRec A record with key columns set.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> deleteAsync(R keyRec);
+ @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull R keyRec);
/**
* Deletes the given record from the table.
*
- * @param rec Record to delete.
+ * @param rec A record to delete.
+ * The record cannot be {@code null}.
* @return {@code True} if removed successfully, {@code false} otherwise.
*/
- boolean deleteExact(R rec);
+ boolean deleteExact(@NotNull R rec);
/**
* Asynchronously deletes given record from the table.
*
- * @param rec Record to delete.
+ * @param rec A record to delete.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Boolean> deleteExactAsync(R rec);
+ @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull R rec);
/**
* Gets then deletes a record with the same key columns values from the table.
*
- * @param rec Record with key columns set.
+ * @param rec A record with key columns set.
+ * The record cannot be {@code null}.
* @return Removed record or {@code null} if not existed.
*/
- R getAndDelete(R rec);
+ R getAndDelete(@NotNull R rec);
/**
* Asynchronously gets then deletes a record with the same key columns values from the table.
*
- * @param rec Record with key columns set.
+ * @param rec A record with key columns set.
+ * The record cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<R> getAndDeleteAsync(R rec);
+ @NotNull CompletableFuture<R> getAndDeleteAsync(@NotNull R rec);
/**
* Remove records with the same key columns values as the given one has from the table.
*
* @param recs Records with key columns set.
+ * The records cannot be {@code null}.
* @return Records with key columns set that were not exists.
*/
- Collection<R> deleteAll(Collection<R> recs);
+ Collection<R> deleteAll(@NotNull Collection<R> recs);
/**
* Asynchronously remove records with the same key columns values as the given one has from the table.
*
* @param recs Records with key columns set.
+ * The records cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Collection<R>> deleteAllAsync(Collection<R> recs);
+ @NotNull CompletableFuture<Collection<R>> deleteAllAsync(@NotNull Collection<R> recs);
/**
* Remove given records from the table.
*
* @param recs Records to delete.
+ * The records cannot be {@code null}.
* @return Records that were not deleted.
*/
- Collection<R> deleteAllExact(Collection<R> recs);
+ Collection<R> deleteAllExact(@NotNull Collection<R> recs);
/**
* Asynchronously remove given records from the table.
*
* @param recs Records to delete.
+ * The records cannot be {@code null}.
* @return Future representing pending completion of the operation.
*/
- @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(Collection<R> recs);
+ @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(@NotNull Collection<R> recs);
/**
* Executes an InvokeProcessor code against a record with the same key columns values as the given one has.
*
- * @param keyRec Record with key columns set.
+ * @param keyRec A record with key columns set.
+ * The record cannot be {@code null}.
* @param proc Invoke processor.
* @param <T> InvokeProcessor result type.
* @return Results of the processing.
*/
- <T extends Serializable> T invoke(R keyRec, InvokeProcessor<R, R, T> proc);
+ <T extends Serializable> T invoke(@NotNull R keyRec, InvokeProcessor<R, R, T> proc);
/**
* Asynchronously executes an InvokeProcessor code against a record
* with the same key columns values as the given one has.
*
- * @param keyRec Record with key columns set.
+ * @param keyRec A record with key columns set.
+ * The record cannot be {@code null}.
* @param proc Invoke processor.
* @param <T> InvokeProcessor result type.
* @return Future representing pending completion of the operation.
*/
- @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(R keyRec, InvokeProcessor<R, R, T> proc);
+ @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(@NotNull R keyRec, InvokeProcessor<R, R, T> proc);
/**
* Executes an InvokeProcessor code against records with the same key columns values as the given ones has.
*
* @param keyRecs Records with key columns set.
+ * The records cannot be {@code null}.
* @param proc Invoke processor.
* @param <T> InvokeProcessor result type.
* @return Results of the processing.
*/
- <T extends Serializable> Map<R, T> invokeAll(Collection<R> keyRecs, InvokeProcessor<R, R, T> proc);
+ <T extends Serializable> Map<R, T> invokeAll(@NotNull Collection<R> keyRecs, InvokeProcessor<R, R, T> proc);
/**
* Asynchronously executes an InvokeProcessor against records with the same key columns values as the given ones
* has.
*
* @param keyRecs Records with key columns set.
+ * The records cannot be {@code null}.
* @param proc Invoke processor.
* @param <T> InvokeProcessor result type.
* @return Results of the processing.
*/
- @NotNull <T extends Serializable> CompletableFuture<Map<R, T>> invokeAllAsync(Collection<R> keyRecs,
+ @NotNull <T extends Serializable> CompletableFuture<Map<R, T>> invokeAllAsync(@NotNull Collection<R> keyRecs,
InvokeProcessor<R, R, T> proc);
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
index 593ac59..72ac0ce 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ITDistributedTableTest.java
@@ -18,7 +18,9 @@
package org.apache.ignite.distributed;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -36,7 +38,7 @@ import org.apache.ignite.internal.schema.SchemaRegistry;
import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
-import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.lang.IgniteLogger;
@@ -59,6 +61,7 @@ import org.apache.ignite.raft.client.service.RaftGroupService;
import org.apache.ignite.raft.client.service.impl.RaftGroupServiceImpl;
import org.apache.ignite.raft.server.RaftServer;
import org.apache.ignite.raft.server.impl.RaftServerImpl;
+import org.apache.ignite.table.KeyValueBinaryView;
import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
import org.jetbrains.annotations.NotNull;
@@ -112,6 +115,9 @@ public class ITDistributedTableTest {
/** Cluster. */
private ArrayList<ClusterService> cluster = new ArrayList<>();
+ /**
+ * Start all cluster nodes before each test.
+ */
@BeforeEach
public void beforeTest() {
for (int i = 0; i < NODES; i++) {
@@ -138,6 +144,11 @@ public class ITDistributedTableTest {
LOG.info("Client started.");
}
+ /**
+ * Shutdowns all cluster nodes after each test.
+ *
+ * @throws Exception If failed.
+ */
@AfterEach
public void afterTest() throws Exception {
for (ClusterService node : cluster) {
@@ -147,6 +158,11 @@ public class ITDistributedTableTest {
client.shutdown();
}
+ /**
+ * Tests partition listener.
+ *
+ * @throws Exception If failed.
+ */
@Test
public void partitionListener() throws Exception {
String grpId = "part";
@@ -172,7 +188,7 @@ public class ITDistributedTableTest {
// Row keyChunk = new Row(SCHEMA, new ByteBufferRow(testRow.keySlice()));
Row keyChunk = getTestKey();
- CompletableFuture<KVGetResponse> getFut = partRaftGrp.run(new GetCommand(keyChunk));
+ CompletableFuture<SingleRowResponse> getFut = partRaftGrp.run(new GetCommand(keyChunk));
assertNotNull(getFut.get().getValue());
@@ -181,6 +197,11 @@ public class ITDistributedTableTest {
partSrv.shutdown();
}
+ /**
+ * Prepares a test row which contains one field.
+ *
+ * @return Row.
+ */
@NotNull private Row getTestKey() {
RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
@@ -189,6 +210,11 @@ public class ITDistributedTableTest {
return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
}
+ /**
+ * Prepares a test row which contains two fields.
+ *
+ * @return Row.
+ */
@NotNull private Row getTestRow() {
RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
@@ -198,6 +224,9 @@ public class ITDistributedTableTest {
return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
}
+ /**
+ * The test prepares a distributed table and checks operation over various views.
+ */
@Test
public void partitionedTable() {
HashMap<ClusterNode, RaftServer> raftServers = new HashMap<>(NODES);
@@ -250,71 +279,232 @@ public class ITDistributedTableTest {
}
});
- for (int i = 0; i < PARTS * 10; i++) {
- tbl.kvView().putIfAbsent(
- tbl.kvView().tupleBuilder()
+ partitionedTableView(tbl, PARTS * 10);
+
+ partitionedTableKVBinaryView(tbl.kvView(), PARTS * 10);
+ }
+
+ /**
+ * Checks operation over row table view.
+ *
+ * @param view Table view.
+ * @param keysCnt Count of keys.
+ */
+ public void partitionedTableView(Table view, int keysCnt) {
+ LOG.info("Test for Table view [keys=" + keysCnt + ']');
+
+ for (int i = 0; i < keysCnt; i++) {
+ view.insert(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .set("value", Long.valueOf(i + 2))
+ .build()
+ );
+ }
+
+ for (int i = 0; i < keysCnt; i++) {
+ Tuple entry = view.get(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+ }
+
+ for (int i = 0; i < keysCnt; i++) {
+ view.upsert(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .set("value", Long.valueOf(i + 5))
+ .build()
+ );
+
+ Tuple entry = view.get(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertEquals(Long.valueOf(i + 5), entry.longValue("value"));
+ }
+
+ HashSet<Tuple> keys = new HashSet<>();
+
+ for (int i = 0; i < keysCnt; i++) {
+ keys.add(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+ }
+
+ Collection<Tuple> entries = view.getAll(keys);
+
+ assertEquals(keysCnt, entries.size());
+
+ for (int i = 0; i < keysCnt; i++) {
+ boolean res = view.replace(
+ view.tupleBuilder()
.set("key", Long.valueOf(i))
+ .set("value", Long.valueOf(i + 5))
.build(),
- tbl.kvView().tupleBuilder()
+ view.tupleBuilder()
+ .set("key", Long.valueOf(i))
.set("value", Long.valueOf(i + 2))
.build());
+
+ assertTrue(res);
+ }
+
+ for (int i = 0; i < keysCnt; i++) {
+ boolean res = view.delete(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertTrue(res);
+
+ Tuple entry = view.get(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertNull(entry);
}
- for (int i = 0; i < PARTS * 10; i++) {
- Tuple entry = tbl.kvView().get(
- tbl.kvView().tupleBuilder()
+ ArrayList<Tuple> batch = new ArrayList<>(keysCnt);
+
+ for (int i = 0; i < keysCnt; i++) {
+ batch.add(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .set("value", Long.valueOf(i + 2))
+ .build());
+ }
+
+ view.upsertAll(batch);
+
+ for (int i = 0; i < keysCnt; i++) {
+ Tuple entry = view.get(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+ }
+
+ view.deleteAll(keys);
+
+ for (Tuple key : keys) {
+ Tuple entry = view.get(key);
+
+ assertNull(entry);
+ }
+ }
+
+ /**
+ * Checks operation over key-value binary table view.
+ *
+ * @param view Table view.
+ * @param keysCnt Count of keys.
+ */
+ public void partitionedTableKVBinaryView(KeyValueBinaryView view, int keysCnt) {
+ LOG.info("Tes for Key-Value binary view [keys=" + keysCnt + ']');
+
+ for (int i = 0; i < keysCnt; i++) {
+ view.putIfAbsent(
+ view.tupleBuilder()
.set("key", Long.valueOf(i))
+ .build(),
+ view.tupleBuilder()
+ .set("value", Long.valueOf(i + 2))
.build());
+ }
- LOG.info("The result is [key=" + i + ", tuple=" + entry + ']');
+ for (int i = 0; i < keysCnt; i++) {
+ Tuple entry = view.get(
+ view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
}
- for (int i = 0; i < PARTS * 10; i++) {
- tbl.kvView().put(
- tbl.kvView().tupleBuilder()
+ for (int i = 0; i < keysCnt; i++) {
+ view.put(
+ view.tupleBuilder()
.set("key", Long.valueOf(i))
.build(),
- tbl.kvView().tupleBuilder()
+ view.tupleBuilder()
.set("value", Long.valueOf(i + 5))
.build());
- Tuple entry = tbl.kvView().get(
- tbl.kvView().tupleBuilder()
+ Tuple entry = view.get(
+ view.tupleBuilder()
.set("key", Long.valueOf(i))
.build());
assertEquals(Long.valueOf(i + 5), entry.longValue("value"));
}
- for (int i = 0; i < PARTS * 10; i++) {
- boolean res = tbl.kvView().replace(
- tbl.kvView().tupleBuilder()
+ HashSet<Tuple> keys = new HashSet<>();
+
+ for (int i = 0; i < keysCnt; i++) {
+ keys.add(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+ }
+
+ Map<Tuple, Tuple> entries = view.getAll(keys);
+
+ assertEquals(keysCnt, entries.size());
+
+ for (int i = 0; i < keysCnt; i++) {
+ boolean res = view.replace(
+ view.tupleBuilder()
.set("key", Long.valueOf(i))
.build(),
- tbl.kvView().tupleBuilder()
+ view.tupleBuilder()
.set("value", Long.valueOf(i + 5))
.build(),
- tbl.kvView().tupleBuilder()
+ view.tupleBuilder()
.set("value", Long.valueOf(i + 2))
.build());
assertTrue(res);
}
- for (int i = 0; i < PARTS * 10; i++) {
- boolean res = tbl.kvView().remove(
- tbl.kvView().tupleBuilder()
+ for (int i = 0; i < keysCnt; i++) {
+ boolean res = view.remove(
+ view.tupleBuilder()
.set("key", Long.valueOf(i))
.build());
assertTrue(res);
- Tuple entry = tbl.kvView().get(
- tbl.kvView().tupleBuilder()
+ Tuple entry = view.get(
+ view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertNull(entry);
+ }
+
+ HashMap<Tuple, Tuple> batch = new HashMap<>(keysCnt);
+
+ for (int i = 0; i < keysCnt; i++) {
+ batch.put(
+ view.tupleBuilder()
.set("key", Long.valueOf(i))
+ .build(),
+ view.tupleBuilder()
+ .set("value", Long.valueOf(i + 2))
.build());
+ }
+
+ view.putAll(batch);
+
+ for (int i = 0; i < keysCnt; i++) {
+ Tuple entry = view.get(view.tupleBuilder()
+ .set("key", Long.valueOf(i))
+ .build());
+
+ assertEquals(Long.valueOf(i + 2), entry.longValue("value"));
+ }
+
+ view.removeAll(keys);
+
+ for (Tuple key : keys) {
+ Tuple entry = view.get(key);
assertNull(entry);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
index 31a578e..be1c3ac 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVBinaryViewImpl.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -35,9 +36,6 @@ import org.jetbrains.annotations.NotNull;
/**
* Key-value view implementation for binary user-object representation.
- *
- * @implNote Key-value {@link Tuple}s represents marshalled user-objects
- * regarding the binary object concept.
*/
public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinaryView {
/** Marshaller. */
@@ -56,12 +54,12 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public Tuple get(Tuple key) {
+ @Override public Tuple get(@NotNull Tuple key) {
return sync(getAsync(key));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> getAsync(Tuple key) {
+ @Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple key) {
Objects.requireNonNull(key);
Row kRow = marshaller().marshal(key, null); // Convert to portable format to pass TX/storage layer.
@@ -72,27 +70,31 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public Map<Tuple, Tuple> getAll(Collection<Tuple> keys) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Map<Tuple, Tuple> getAll(@NotNull Collection<Tuple> keys) {
+ return sync(getAllAsync(keys));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<Tuple, Tuple>> getAllAsync(Collection<Tuple> keys) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Map<Tuple, Tuple>> getAllAsync(@NotNull Collection<Tuple> keys) {
+ Objects.requireNonNull(keys);
+
+ return tbl.getAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+ .thenApply(this::wrap)
+ .thenApply(ts -> ts.stream().filter(Objects::nonNull).collect(Collectors.toMap(TableRow::keyChunk, TableRow::valueChunk)));
}
/** {@inheritDoc} */
- @Override public boolean contains(Tuple key) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public boolean contains(@NotNull Tuple key) {
+ return get(key) != null;
}
/** {@inheritDoc} */
- @Override public void put(Tuple key, Tuple val) {
+ @Override public void put(@NotNull Tuple key, Tuple val) {
sync(putAsync(key, val));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> putAsync(Tuple key, Tuple val) {
+ @Override public @NotNull CompletableFuture<Void> putAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
@@ -101,22 +103,27 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public void putAll(Map<Tuple, Tuple> pairs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public void putAll(@NotNull Map<Tuple, Tuple> pairs) {
+ sync(putAllAsync(pairs));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> putAllAsync(Map<Tuple, Tuple> pairs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Void> putAllAsync(@NotNull Map<Tuple, Tuple> pairs) {
+ Objects.requireNonNull(pairs);
+
+ return tbl.upsertAll(pairs.entrySet()
+ .stream()
+ .map(this::marshalPair)
+ .collect(Collectors.toList()));
}
/** {@inheritDoc} */
- @Override public Tuple getAndPut(Tuple key, Tuple val) {
+ @Override public Tuple getAndPut(@NotNull Tuple key, Tuple val) {
return sync(getAndPutAsync(key, val));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> getAndPutAsync(Tuple key, Tuple val) {
+ @Override public @NotNull CompletableFuture<Tuple> getAndPutAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
@@ -127,14 +134,13 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public boolean putIfAbsent(Tuple key, Tuple val) {
+ @Override public boolean putIfAbsent(@NotNull Tuple key, @NotNull Tuple val) {
return sync(putIfAbsentAsync(key, val));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(Tuple key, Tuple val) {
+ @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
- Objects.requireNonNull(val);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
@@ -142,12 +148,12 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public boolean remove(Tuple key) {
+ @Override public boolean remove(@NotNull Tuple key) {
return sync(removeAsync(key));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> removeAsync(Tuple key) {
+ @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull Tuple key) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, null); // Convert to portable format to pass TX/storage layer.
@@ -156,12 +162,12 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public boolean remove(Tuple key, Tuple val) {
+ @Override public boolean remove(@NotNull Tuple key, @NotNull Tuple val) {
return sync(removeAsync(key, val));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> removeAsync(Tuple key, Tuple val) {
+ @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull Tuple key, @NotNull Tuple val) {
Objects.requireNonNull(key);
Objects.requireNonNull(val);
@@ -171,32 +177,44 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public Collection<Tuple> removeAll(Collection<Tuple> keys) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Collection<Tuple> removeAll(@NotNull Collection<Tuple> keys) {
+ Objects.requireNonNull(keys);
+
+ return sync(removeAllAsync(keys));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> removeAllAsync(Collection<Tuple> keys) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> removeAllAsync(@NotNull Collection<Tuple> keys) {
+ Objects.requireNonNull(keys);
+
+ return tbl.deleteAll(keys.stream().map(k -> marsh.marshal(k, null)).collect(Collectors.toList()))
+ .thenApply(this::wrap)
+ .thenApply(t -> t.stream().filter(Objects::nonNull).map(TableRow::valueChunk).collect(Collectors.toList()));
}
/** {@inheritDoc} */
- @Override public Tuple getAndRemove(Tuple key) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Tuple getAndRemove(@NotNull Tuple key) {
+ Objects.requireNonNull(key);
+
+ return sync(getAndRemoveAsync(key));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(Tuple key) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Tuple> getAndRemoveAsync(@NotNull Tuple key) {
+ Objects.requireNonNull(key);
+
+ return tbl.getAndDelete(marsh.marshal(key, null))
+ .thenApply(this::wrap)
+ .thenApply(t -> t == null ? null : t.valueChunk());
}
/** {@inheritDoc} */
- @Override public boolean replace(Tuple key, Tuple val) {
+ @Override public boolean replace(@NotNull Tuple key, Tuple val) {
return sync(replaceAsync(key, val));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple key, Tuple val) {
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple key, Tuple val) {
Objects.requireNonNull(key);
Row row = marshaller().marshal(key, val); // Convert to portable format to pass TX/storage layer.
@@ -205,12 +223,12 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public boolean replace(Tuple key, Tuple oldVal, Tuple newVal) {
+ @Override public boolean replace(@NotNull Tuple key, Tuple oldVal, Tuple newVal) {
return sync(replaceAsync(key, oldVal, newVal));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple key, Tuple oldVal, Tuple newVal) {
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple key, Tuple oldVal, Tuple newVal) {
Objects.requireNonNull(key);
Row oldRow = marshaller().marshal(key, oldVal); // Convert to portable format to pass TX/storage layer.
@@ -220,18 +238,22 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
}
/** {@inheritDoc} */
- @Override public Tuple getAndReplace(Tuple key, Tuple val) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Tuple getAndReplace(@NotNull Tuple key, Tuple val) {
+ return sync(getAndReplaceAsync(key, val));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(Tuple key, Tuple val) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple key, Tuple val) {
+ Objects.requireNonNull(key);
+
+ return tbl.getAndReplace(marsh.marshal(key, val))
+ .thenApply(this::wrap)
+ .thenApply(t -> t == null ? null : t.valueChunk());
}
/** {@inheritDoc} */
@Override public <R extends Serializable> R invoke(
- Tuple key,
+ @NotNull Tuple key,
InvokeProcessor<Tuple, Tuple, R> proc,
Serializable... args
) {
@@ -240,7 +262,7 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
/** {@inheritDoc} */
@Override public @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
- Tuple key,
+ @NotNull Tuple key,
InvokeProcessor<Tuple, Tuple, R> proc,
Serializable... args
) {
@@ -249,7 +271,7 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
/** {@inheritDoc} */
@Override public <R extends Serializable> Map<Tuple, R> invokeAll(
- Collection<Tuple> keys,
+ @NotNull Collection<Tuple> keys,
InvokeProcessor<Tuple, Tuple, R> proc,
Serializable... args
) {
@@ -258,7 +280,7 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
/** {@inheritDoc} */
@Override public @NotNull <R extends Serializable> CompletableFuture<Map<Tuple, R>> invokeAllAsync(
- Collection<Tuple> keys,
+ @NotNull Collection<Tuple> keys,
InvokeProcessor<Tuple, Tuple, R> proc,
Serializable... args
) {
@@ -289,4 +311,27 @@ public class KVBinaryViewImpl extends AbstractTableView implements KeyValueBinar
return new TableRow(schema, new Row(schema, row));
}
+
+ /**
+ * Marshals a key-value pairs into the table row collection.
+ *
+ * @param rows Binary rows.
+ * @return Table rows.
+ */
+ private Collection<TableRow> wrap(Collection<BinaryRow> rows) {
+ if (rows == null)
+ return null;
+
+ return rows.stream().map(this::wrap).collect(Collectors.toSet());
+ }
+
+ /**
+ * Marshals a key-value pair into the table row.
+ *
+ * @param pair A map entry represents the key-value pair.
+ * @return Row.
+ */
+ private Row marshalPair(Map.Entry<Tuple, Tuple> pair) {
+ return marshaller().marshal(pair.getKey(), pair.getValue());
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
index 6d3d10b..c744e93 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/KVViewImpl.java
@@ -51,12 +51,12 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
}
/** {@inheritDoc} */
- @Override public V get(K key) {
+ @Override public V get(@NotNull K key) {
return sync(getAsync(key));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<V> getAsync(K key) {
+ @Override public @NotNull CompletableFuture<V> getAsync(@NotNull K key) {
Objects.requireNonNull(key);
final KVSerializer<K, V> marsh = marshaller();
@@ -69,138 +69,138 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
}
/** {@inheritDoc} */
- @Override public Map<K, V> getAll(Collection<K> keys) {
+ @Override public Map<K, V> getAll(@NotNull Collection<K> keys) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Map<K, V>> getAllAsync(Collection<K> keys) {
+ @Override public @NotNull CompletableFuture<Map<K, V>> getAllAsync(@NotNull Collection<K> keys) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean contains(K key) {
+ @Override public boolean contains(@NotNull K key) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public void put(K key, V val) {
+ @Override public void put(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> putAsync(K key, V val) {
+ @Override public @NotNull CompletableFuture<Void> putAsync(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public void putAll(Map<K, V> pairs) {
+ @Override public void putAll(@NotNull Map<K, V> pairs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> putAllAsync(Map<K, V> pairs) {
+ @Override public @NotNull CompletableFuture<Void> putAllAsync(@NotNull Map<K, V> pairs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public V getAndPut(K key, V val) {
+ @Override public V getAndPut(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<V> getAndPutAsync(K key, V val) {
+ @Override public @NotNull CompletableFuture<V> getAndPutAsync(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean putIfAbsent(K key, V val) {
+ @Override public boolean putIfAbsent(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(K key, V val) {
+ @Override public @NotNull CompletableFuture<Boolean> putIfAbsentAsync(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean remove(K key) {
+ @Override public boolean remove(@NotNull K key) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> removeAsync(K key) {
+ @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean remove(K key, V val) {
+ @Override public boolean remove(@NotNull K key, @NotNull V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> removeAsync(K key, V val) {
+ @Override public @NotNull CompletableFuture<Boolean> removeAsync(@NotNull K key, @NotNull V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public Collection<K> removeAll(Collection<K> keys) {
+ @Override public Collection<K> removeAll(@NotNull Collection<K> keys) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<K> removeAllAsync(Collection<K> keys) {
+ @Override public @NotNull CompletableFuture<Collection<K>> removeAllAsync(@NotNull Collection<K> keys) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public V getAndRemove(K key) {
+ @Override public V getAndRemove(@NotNull K key) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<V> getAndRemoveAsync(K key) {
+ @Override public @NotNull CompletableFuture<V> getAndRemoveAsync(@NotNull K key) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean replace(K key, V val) {
+ @Override public boolean replace(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> replaceAsync(K key, V val) {
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean replace(K key, V oldVal, V newVal) {
+ @Override public boolean replace(@NotNull K key, V oldVal, V newVal) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> replaceAsync(K key, V oldVal, V newVal) {
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull K key, V oldVal, V newVal) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public V getAndReplace(K key, V val) {
+ @Override public V getAndReplace(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<V> getAndReplaceAsync(K key, V val) {
+ @Override public @NotNull CompletableFuture<V> getAndReplaceAsync(@NotNull K key, V val) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public <R extends Serializable> R invoke(K key, InvokeProcessor<K, V, R> proc, Serializable... args) {
+ @Override public <R extends Serializable> R invoke(@NotNull K key, InvokeProcessor<K, V, R> proc, Serializable... args) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public @NotNull <R extends Serializable> CompletableFuture<R> invokeAsync(
- K key,
+ @NotNull K key,
InvokeProcessor<K, V, R> proc,
Serializable... args
) {
@@ -209,7 +209,7 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
/** {@inheritDoc} */
@Override public <R extends Serializable> Map<K, R> invokeAll(
- Collection<K> keys,
+ @NotNull Collection<K> keys,
InvokeProcessor<K, V, R> proc,
Serializable... args
) {
@@ -218,7 +218,7 @@ public class KVViewImpl<K, V> extends AbstractTableView implements KeyValueView<
/** {@inheritDoc} */
@Override public @NotNull <R extends Serializable> CompletableFuture<Map<K, R>> invokeAllAsync(
- Collection<K> keys,
+ @NotNull Collection<K> keys,
InvokeProcessor<K, V, R> proc, Serializable... args
) {
throw new UnsupportedOperationException("Not implemented yet.");
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
index f442bbb..27bdc44 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/RecordViewImpl.java
@@ -66,12 +66,12 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R
}
/** {@inheritDoc} */
- @Override public R get(R keyRec) {
+ @Override public R get(@NotNull R keyRec) {
return sync(getAsync(keyRec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<R> getAsync(R keyRec) {
+ @Override public @NotNull CompletableFuture<R> getAsync(@NotNull R keyRec) {
Objects.requireNonNull(keyRec);
RecordSerializer<R> marsh = serializer();
@@ -84,159 +84,161 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R
}
/** {@inheritDoc} */
- @Override public Collection<R> getAll(Collection<R> keyRecs) {
+ @Override public Collection<R> getAll(@NotNull Collection<R> keyRecs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Collection<R>> getAllAsync(Collection<R> keyRecs) {
+ @Override public @NotNull CompletableFuture<Collection<R>> getAllAsync(@NotNull Collection<R> keyRecs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public void upsert(R rec) {
+ @Override public void upsert(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> upsertAsync(R rec) {
+ @Override public @NotNull CompletableFuture<Void> upsertAsync(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public void upsertAll(Collection<R> recs) {
+ @Override public void upsertAll(@NotNull Collection<R> recs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> upsertAllAsync(Collection<R> recs) {
+ @Override public @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<R> recs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public R getAndUpsert(R rec) {
+ @Override public R getAndUpsert(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<R> getAndUpsertAsync(R rec) {
+ @Override public @NotNull CompletableFuture<R> getAndUpsertAsync(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean insert(R rec) {
+ @Override public boolean insert(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> insertAsync(R rec) {
+ @Override public @NotNull CompletableFuture<Boolean> insertAsync(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public Collection<R> insertAll(Collection<R> recs) {
+ @Override public Collection<R> insertAll(@NotNull Collection<R> recs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Collection<R>> insertAllAsync(Collection<R> recs) {
+ @Override public @NotNull CompletableFuture<Collection<R>> insertAllAsync(@NotNull Collection<R> recs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean replace(R rec) {
+ @Override public boolean replace(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> replaceAsync(R rec) {
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean replace(R oldRec, R newRec) {
+ @Override public boolean replace(@NotNull R oldRec, @NotNull R newRec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> replaceAsync(R oldRec, R newRec) {
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull R oldRec, @NotNull R newRec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public R getAndReplace(R rec) {
+ @Override public R getAndReplace(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<R> getAndReplaceAsync(R rec) {
+ @Override public @NotNull CompletableFuture<R> getAndReplaceAsync(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean delete(R keyRec) {
+ @Override public boolean delete(@NotNull R keyRec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> deleteAsync(R keyRec) {
+ @Override public @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull R keyRec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public boolean deleteExact(R rec) {
+ @Override public boolean deleteExact(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(R rec) {
+ @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public R getAndDelete(R rec) {
+ @Override public R getAndDelete(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<R> getAndDeleteAsync(R rec) {
+ @Override public @NotNull CompletableFuture<R> getAndDeleteAsync(@NotNull R rec) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public Collection<R> deleteAll(Collection<R> recs) {
+ @Override public Collection<R> deleteAll(@NotNull Collection<R> recs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Collection<R>> deleteAllAsync(Collection<R> recs) {
+ @Override public @NotNull CompletableFuture<Collection<R>> deleteAllAsync(@NotNull Collection<R> recs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public Collection<R> deleteAllExact(Collection<R> recs) {
+ @Override public Collection<R> deleteAllExact(@NotNull Collection<R> recs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(Collection<R> recs) {
+ @Override public @NotNull CompletableFuture<Collection<R>> deleteAllExactAsync(@NotNull Collection<R> recs) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public <T extends Serializable> T invoke(R keyRec, InvokeProcessor<R, R, T> proc) {
+ @Override public <T extends Serializable> T invoke(@NotNull R keyRec, InvokeProcessor<R, R, T> proc) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
- @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(R keyRec,
- InvokeProcessor<R, R, T> proc) {
+ @Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(
+ @NotNull R keyRec,
+ InvokeProcessor<R, R, T> proc
+ ) {
throw new UnsupportedOperationException("Not implemented yet.");
}
/** {@inheritDoc} */
@Override public <T extends Serializable> Map<R, T> invokeAll(
- Collection<R> keyRecs,
+ @NotNull Collection<R> keyRecs,
InvokeProcessor<R, R, T> proc
) {
throw new UnsupportedOperationException("Not implemented yet.");
@@ -244,7 +246,7 @@ public class RecordViewImpl<R> extends AbstractTableView implements RecordView<R
/** {@inheritDoc} */
@Override public @NotNull <T extends Serializable> CompletableFuture<Map<R, T>> invokeAllAsync(
- Collection<R> keyRecs,
+ @NotNull Collection<R> keyRecs,
InvokeProcessor<R, R, T> proc
) {
throw new UnsupportedOperationException("Not implemented yet.");
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index 58ba9cc..a7cee35 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.table;
import java.io.Serializable;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Row;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -92,12 +94,12 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public Tuple get(Tuple keyRec) {
+ @Override public Tuple get(@NotNull Tuple keyRec) {
return sync(getAsync(keyRec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> getAsync(Tuple keyRec) {
+ @Override public @NotNull CompletableFuture<Tuple> getAsync(@NotNull Tuple keyRec) {
Objects.requireNonNull(keyRec);
final Row keyRow = marshaller().marshal(keyRec, null); // Convert to portable format to pass TX/storage layer.
@@ -106,22 +108,32 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public Collection<Tuple> getAll(Collection<Tuple> keyRecs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Collection<Tuple> getAll(@NotNull Collection<Tuple> keyRecs) {
+ return sync(getAllAsync(keyRecs));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(Collection<Tuple> keyRecs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> getAllAsync(@NotNull Collection<Tuple> keyRecs) {
+ Objects.requireNonNull(keyRecs);
+
+ HashSet<BinaryRow> keys = new HashSet<>(keyRecs.size());
+
+ for (Tuple keyRec : keyRecs) {
+ final Row keyRow = marshaller().marshal(keyRec, null);
+
+ keys.add(keyRow);
+ }
+
+ return tbl.getAll(keys).thenApply(this::wrap);
}
/** {@inheritDoc} */
- @Override public void upsert(Tuple rec) {
+ @Override public void upsert(@NotNull Tuple rec) {
sync(upsertAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> upsertAsync(Tuple rec) {
+ @Override public @NotNull CompletableFuture<Void> upsertAsync(@NotNull Tuple rec) {
Objects.requireNonNull(rec);
final Row keyRow = marshaller().marshal(rec);
@@ -130,22 +142,32 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public void upsertAll(Collection<Tuple> recs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public void upsertAll(@NotNull Collection<Tuple> recs) {
+ sync(upsertAllAsync(recs));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Void> upsertAllAsync(Collection<Tuple> recs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Void> upsertAllAsync(@NotNull Collection<Tuple> recs) {
+ Objects.requireNonNull(recs);
+
+ HashSet<BinaryRow> keys = new HashSet<>(recs.size());
+
+ for (Tuple keyRec : recs) {
+ final Row keyRow = marshaller().marshal(keyRec);
+
+ keys.add(keyRow);
+ }
+
+ return tbl.upsertAll(keys);
}
/** {@inheritDoc} */
- @Override public Tuple getAndUpsert(Tuple rec) {
+ @Override public Tuple getAndUpsert(@NotNull Tuple rec) {
return sync(getAndUpsertAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(Tuple rec) {
+ @Override public @NotNull CompletableFuture<Tuple> getAndUpsertAsync(@NotNull Tuple rec) {
Objects.requireNonNull(rec);
final Row keyRow = marshaller().marshal(rec);
@@ -154,12 +176,12 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public boolean insert(Tuple rec) {
+ @Override public boolean insert(@NotNull Tuple rec) {
return sync(insertAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> insertAsync(Tuple rec) {
+ @Override public @NotNull CompletableFuture<Boolean> insertAsync(@NotNull Tuple rec) {
Objects.requireNonNull(rec);
final Row keyRow = marshaller().marshal(rec);
@@ -168,22 +190,32 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public Collection<Tuple> insertAll(Collection<Tuple> recs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Collection<Tuple> insertAll(@NotNull Collection<Tuple> recs) {
+ return sync(insertAllAsync(recs));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(Collection<Tuple> recs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> insertAllAsync(@NotNull Collection<Tuple> recs) {
+ Objects.requireNonNull(recs);
+
+ HashSet<BinaryRow> keys = new HashSet<>(recs.size());
+
+ for (Tuple keyRec : recs) {
+ final Row keyRow = marshaller().marshal(keyRec);
+
+ keys.add(keyRow);
+ }
+
+ return tbl.insertAll(keys).thenApply(this::wrap);
}
/** {@inheritDoc} */
- @Override public boolean replace(Tuple rec) {
+ @Override public boolean replace(@NotNull Tuple rec) {
return sync(replaceAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple rec) {
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple rec) {
Objects.requireNonNull(rec);
final Row keyRow = marshaller().marshal(rec);
@@ -192,12 +224,12 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public boolean replace(Tuple oldRec, Tuple newRec) {
+ @Override public boolean replace(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
return sync(replaceAsync(oldRec, newRec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> replaceAsync(Tuple oldRec, Tuple newRec) {
+ @Override public @NotNull CompletableFuture<Boolean> replaceAsync(@NotNull Tuple oldRec, @NotNull Tuple newRec) {
Objects.requireNonNull(oldRec);
Objects.requireNonNull(newRec);
@@ -208,22 +240,26 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public Tuple getAndReplace(Tuple rec) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Tuple getAndReplace(@NotNull Tuple rec) {
+ return sync(getAndReplaceAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(Tuple rec) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Tuple> getAndReplaceAsync(@NotNull Tuple rec) {
+ Objects.requireNonNull(rec);
+
+ final Row keyRow = marshaller().marshal(rec);
+
+ return tbl.getAndReplace(keyRow).thenApply(this::wrap);
}
/** {@inheritDoc} */
- @Override public boolean delete(Tuple keyRec) {
+ @Override public boolean delete(@NotNull Tuple keyRec) {
return sync(deleteAsync(keyRec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> deleteAsync(Tuple keyRec) {
+ @Override public @NotNull CompletableFuture<Boolean> deleteAsync(@NotNull Tuple keyRec) {
Objects.requireNonNull(keyRec);
final Row keyRow = marshaller().marshal(keyRec, null);
@@ -232,12 +268,12 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public boolean deleteExact(Tuple rec) {
+ @Override public boolean deleteExact(@NotNull Tuple rec) {
return sync(deleteExactAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(Tuple rec) {
+ @Override public @NotNull CompletableFuture<Boolean> deleteExactAsync(@NotNull Tuple rec) {
Objects.requireNonNull(rec);
final Row row = marshaller().marshal(rec);
@@ -246,39 +282,64 @@ public class TableImpl extends AbstractTableView implements Table {
}
/** {@inheritDoc} */
- @Override public Tuple getAndDelete(Tuple rec) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Tuple getAndDelete(@NotNull Tuple rec) {
+ return sync(getAndDeleteAsync(rec));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(Tuple rec) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Tuple> getAndDeleteAsync(@NotNull Tuple rec) {
+ Objects.requireNonNull(rec);
+
+ final Row row = marshaller().marshal(rec);
+
+ return tbl.getAndDelete(row).thenApply(this::wrap);
}
/** {@inheritDoc} */
- @Override public Collection<Tuple> deleteAll(Collection<Tuple> recs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Collection<Tuple> deleteAll(@NotNull Collection<Tuple> recs) {
+ return sync(deleteAllAsync(recs));
}
/** {@inheritDoc} */
- @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(Collection<Tuple> recs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllAsync(@NotNull Collection<Tuple> recs) {
+ Objects.requireNonNull(recs);
+
+ HashSet<BinaryRow> keys = new HashSet<>(recs.size());
+
+ for (Tuple keyRec : recs) {
+ final Row keyRow = marshaller().marshal(keyRec, null);
+
+ keys.add(keyRow);
+ }
+
+ return tbl.deleteAll(keys).thenApply(this::wrap);
}
/** {@inheritDoc} */
- @Override public Collection<Tuple> deleteAllExact(Collection<Tuple> recs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @Override public Collection<Tuple> deleteAllExact(@NotNull Collection<Tuple> recs) {
+ return sync(deleteAllExactAsync(recs));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<Tuple>> deleteAllExactAsync(
- Collection<Tuple> recs) {
- throw new UnsupportedOperationException("Not implemented yet.");
+ @NotNull Collection<Tuple> recs
+ ) {
+ Objects.requireNonNull(recs);
+
+ HashSet<BinaryRow> keys = new HashSet<>(recs.size());
+
+ for (Tuple keyRec : recs) {
+ final Row keyRow = marshaller().marshal(keyRec);
+
+ keys.add(keyRow);
+ }
+
+ return tbl.deleteAllExact(keys).thenApply(this::wrap);
}
/** {@inheritDoc} */
@Override public <T extends Serializable> T invoke(
- Tuple keyRec,
+ @NotNull Tuple keyRec,
InvokeProcessor<Tuple, Tuple, T> proc
) {
throw new UnsupportedOperationException("Not implemented yet.");
@@ -286,7 +347,7 @@ public class TableImpl extends AbstractTableView implements Table {
/** {@inheritDoc} */
@Override public @NotNull <T extends Serializable> CompletableFuture<T> invokeAsync(
- Tuple keyRec,
+ @NotNull Tuple keyRec,
InvokeProcessor<Tuple, Tuple, T> proc
) {
throw new UnsupportedOperationException("Not implemented yet.");
@@ -294,7 +355,7 @@ public class TableImpl extends AbstractTableView implements Table {
/** {@inheritDoc} */
@Override public <T extends Serializable> Map<Tuple, T> invokeAll(
- Collection<Tuple> keyRecs,
+ @NotNull Collection<Tuple> keyRecs,
InvokeProcessor<Tuple, Tuple, T> proc
) {
throw new UnsupportedOperationException("Not implemented yet.");
@@ -302,7 +363,7 @@ public class TableImpl extends AbstractTableView implements Table {
/** {@inheritDoc} */
@Override public @NotNull <T extends Serializable> CompletableFuture<Map<Tuple, T>> invokeAllAsync(
- Collection<Tuple> keyRecs,
+ @NotNull Collection<Tuple> keyRecs,
InvokeProcessor<Tuple, Tuple, T> proc
) {
throw new UnsupportedOperationException("Not implemented yet.");
@@ -332,4 +393,15 @@ public class TableImpl extends AbstractTableView implements Table {
return new TableRow(schema, new Row(schema, row));
}
+
+ /**
+ * @param rows Binary rows.
+ * @return Table rows.
+ */
+ private Collection<Tuple> wrap(Collection<BinaryRow> rows) {
+ if (rows == null)
+ return null;
+
+ return rows.stream().map(this::wrap).collect(Collectors.toSet());
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
new file mode 100644
index 0000000..054df56
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.lang.IgniteLogger;
+
+/**
+ * This is an utility class for serialization cache tuples. It will be removed after another way for serialization is
+ * implemented into the network layer.
+ * TODO: Remove it after (IGNITE-14793)
+ */
+public class CommandUtils {
+ /** The logger. */
+ private static final IgniteLogger LOG = IgniteLogger.forClass(CommandUtils.class);
+
+ /**
+ * Writes a list of rows to byte array.
+ *
+ * @param rows Collection of rows.
+ * @param consumer Byte array consumer.
+ */
+ public static void rowsToBytes(Collection<BinaryRow> rows, Consumer<byte[]> consumer) {
+ if (rows == null || rows.isEmpty())
+ return;
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ for (BinaryRow row : rows) {
+ rowToBytes(row, bytes -> {
+ try {
+ baos.write(intToBytes(bytes.length));
+
+ baos.write(bytes);
+ }
+ catch (IOException e) {
+ LOG.error("Could not write row to stream [row=" + row + ']', e);
+ }
+
+ });
+ }
+
+ baos.flush();
+
+ consumer.accept(baos.toByteArray());
+ }
+ catch (IOException e) {
+ LOG.error("Could not write rows to stream [rows=" + rows.size() + ']', e);
+
+ consumer.accept(null);
+ }
+ }
+
+ /**
+ * Writes a row to byte array.
+ *
+ * @param row Row.
+ * @param consumer Byte array consumer.
+ */
+ public static void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
+ if (row == null)
+ return;
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+ row.writeTo(baos);
+
+ baos.flush();
+
+ consumer.accept(baos.toByteArray());
+ }
+ catch (IOException e) {
+ LOG.error("Could not write row to stream [row=" + row + ']', e);
+
+ consumer.accept(null);
+ }
+ }
+
+ /**
+ * Reads the keys from a byte array.
+ *
+ * @param bytes Byte array.
+ * @param consumer Consumer for binary row.
+ */
+ public static void readRows(byte[] bytes, Consumer<BinaryRow> consumer) {
+ if (bytes == null || bytes.length == 0)
+ return;
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
+ byte[] lenBytes = new byte[4];
+
+ byte[] rowBytes;
+
+ int read;
+
+ while ((read = bais.read(lenBytes)) != -1) {
+ assert read == 4;
+
+ int len = bytesToInt(lenBytes);
+
+ assert len > 0;
+
+ rowBytes = new byte[len];
+
+ read = bais.read(rowBytes);
+
+ assert read == len;
+
+ consumer.accept(new ByteBufferRow(rowBytes));
+ }
+ }
+ catch (IOException e) {
+ LOG.error("Could not read rows from stream.", e);
+ }
+ }
+
+ /**
+ * Serializes an integer to the byte array.
+ *
+ * @param i Integer value.
+ * @return Byte array.
+ */
+ private static byte[] intToBytes(int i) {
+ byte[] arr = new byte[4];
+ ByteBuffer.wrap(arr).putInt(i);
+ return arr;
+ }
+
+ /**
+ * Deserializes a byte array to the integer.
+ *
+ * @param bytes Byte array.
+ * @return Integer value.
+ */
+ private static int bytesToInt(byte[] bytes) {
+ return ByteBuffer.wrap(bytes).getInt();
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
new file mode 100644
index 0000000..167b58d
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command deletes entries by the passed keys.
+ */
+public class DeleteAllCommand implements WriteCommand {
+ /** Binary rows. */
+ private transient Set<BinaryRow> rows;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after (IGNITE-14793).
+ */
+ private byte[] rowsBytes;
+
+ /**
+ * Creates a new instance of DeleteAllCommand with the given set of keys to be deleted.
+ * The {@code keyRows} should not be {@code null} or empty.
+ *
+ * @param keyRows Collection of binary row keys to be deleted.
+ */
+ public DeleteAllCommand(@NotNull Set<BinaryRow> keyRows) {
+ assert keyRows != null && !keyRows.isEmpty();
+
+ this.rows = keyRows;
+
+ CommandUtils.rowsToBytes(keyRows, bytes -> rowsBytes = bytes);
+ }
+
+ /**
+ * Returns a set of binary key rows to be deleted.
+ *
+ * @return Binary keys.
+ */
+ public Set<BinaryRow> getRows() {
+ if (rows == null && rowsBytes != null) {
+ rows = new HashSet<>();
+
+ CommandUtils.readRows(rowsBytes, rows::add);
+ }
+
+ return rows;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
index eabe187..f035eb8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
@@ -17,12 +17,8 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -30,55 +26,34 @@ import org.jetbrains.annotations.NotNull;
* The command deletes a entry by passed key.
*/
public class DeleteCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(DeleteCommand.class);
-
- /** Key row. */
+ /** Binary key row. */
private transient BinaryRow keyRow;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] keyRowBytes;
/**
- * @param keyRow Key row.
+ * Creates a new instance of DeleteCommand with the given key to be deleted.
+ * The {@code keyRow} should not be {@code null}.
+ *
+ * @param keyRow Binary key row.
*/
public DeleteCommand(@NotNull BinaryRow keyRow) {
assert keyRow != null;
this.keyRow = keyRow;
- rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
- }
-
- /**
- * Writes a row to byte array.
- *
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
}
/**
- * Gets a key row.
+ * Gets a binary key row to be deleted.
*
- * @return Key row.
+ * @return Binary key.
*/
public BinaryRow getKeyRow() {
if (keyRow == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
new file mode 100644
index 0000000..65846a1
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command deletes entries that exact the same as the rows passed.
+ */
+public class DeleteExactAllCommand implements WriteCommand {
+ /** Binary rows. */
+ private transient Set<BinaryRow> rows;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after (IGNITE-14793).
+ */
+ private byte[] rowsBytes;
+
+ /**
+ * Creates a new instance of DeleteExactAllCommand with the given set of rows to be deleted.
+ * The {@code rows} should not be {@code null} or empty.
+ *
+ * @param rows Binary rows.
+ */
+ public DeleteExactAllCommand(@NotNull Set<BinaryRow> rows) {
+ assert rows != null && !rows.isEmpty();
+
+ this.rows = rows;
+
+ CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+ }
+
+ /**
+ * Gets a set of binary rows to be deleted.
+ *
+ * @return Binary rows.
+ */
+ public Set<BinaryRow> getRows() {
+ if (rows == null && rowsBytes != null) {
+ rows = new HashSet<>();
+
+ CommandUtils.readRows(rowsBytes, rows::add);
+ }
+
+ return rows;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactCommand.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactCommand.java
index be6ddcc..ae50136 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteExactCommand.java
@@ -17,68 +17,43 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
/**
- * The command inserts a row.
+ * The command deletes an entry that is exact the same as the row passed.
*/
-public class InsertCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
- /** Row. */
+public class DeleteExactCommand implements WriteCommand {
+ /** Binary row. */
private transient BinaryRow row;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] rowBytes;
/**
- * @param row Row.
+ * Creates a new instance of DeleteExactCommand with the given row to be deleted.
+ * The {@code row} should not be {@code null}.
+ *
+ * @param row Binary row.
*/
- public InsertCommand(@NotNull BinaryRow row) {
+ public DeleteExactCommand(@NotNull BinaryRow row) {
assert row != null;
this.row = row;
- rowToBytes(row, bytes -> rowBytes = bytes);
+ CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
}
/**
- * Writes a row to byte array.
+ * Gets a binary key row to be got.
*
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
- }
-
- /**
- * Gets a data row.
- *
- * @return Data row.
+ * @return Binary row.
*/
public BinaryRow getRow() {
if (row == null)
@@ -86,4 +61,5 @@ public class InsertCommand implements WriteCommand {
return row;
}
+
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAllCommand.java
new file mode 100644
index 0000000..72eeddb
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.ReadCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * This is a command for the batch get operation.
+ */
+public class GetAllCommand implements ReadCommand {
+ /** Binary key rows. */
+ private transient Set<BinaryRow> keyRows;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after (IGNITE-14793).
+ */
+ private byte[] keyRowsBytes;
+
+ /**
+ * Creates a new instance of GetAllCommand with the given keys to be got.
+ * The {@code keyRows} should not be {@code null} or empty.
+ *
+ * @param keyRows Binary key rows.
+ */
+ public GetAllCommand(@NotNull Set<BinaryRow> keyRows) {
+ assert keyRows != null && !keyRows.isEmpty();
+
+ this.keyRows = keyRows;
+
+ CommandUtils.rowsToBytes(keyRows, bytes -> keyRowsBytes = bytes);
+ }
+
+ /**
+ * Gets a set of binary key rows to be got.
+ *
+ * @return Binary keys.
+ */
+ public Set<BinaryRow> getKeyRows() {
+ if (keyRows == null && keyRowsBytes != null) {
+ keyRows = new HashSet<>();
+
+ CommandUtils.readRows(keyRowsBytes, keyRows::add);
+ }
+
+ return keyRows;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndDeleteCommand.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndDeleteCommand.java
index eabe187..bf4e40b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndDeleteCommand.java
@@ -17,68 +17,43 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
/**
- * The command deletes a entry by passed key.
+ * This is a command to get a value before delete it.
*/
-public class DeleteCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(DeleteCommand.class);
-
- /** Key row. */
+public class GetAndDeleteCommand implements WriteCommand {
+ /** Binary key row. */
private transient BinaryRow keyRow;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] keyRowBytes;
/**
- * @param keyRow Key row.
+ * Creates a new instance of GetAndDeleteCommand with the given key to be got and deleted.
+ * The {@code keyRow} should not be {@code null}.
+ *
+ * @param keyRow Binary key row.
*/
- public DeleteCommand(@NotNull BinaryRow keyRow) {
+ public GetAndDeleteCommand(@NotNull BinaryRow keyRow) {
assert keyRow != null;
this.keyRow = keyRow;
- rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
+ CommandUtils.rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
}
/**
- * Writes a row to byte array.
+ * Gets a binary key row to be got and deleted.
*
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
- }
-
- /**
- * Gets a key row.
- *
- * @return Key row.
+ * @return Binary key.
*/
public BinaryRow getKeyRow() {
if (keyRow == null)
@@ -86,5 +61,4 @@ public class DeleteCommand implements WriteCommand {
return keyRow;
}
-
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndReplaceCommand.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndReplaceCommand.java
index be6ddcc..d70ebe1 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndReplaceCommand.java
@@ -17,68 +17,43 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
/**
- * The command inserts a row.
+ * This is a command to get a value before replace it.
*/
-public class InsertCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
- /** Row. */
+public class GetAndReplaceCommand implements WriteCommand {
+ /** Binary row. */
private transient BinaryRow row;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] rowBytes;
/**
- * @param row Row.
+ * Creates a new instance of GetAndReplaceCommand with the given row to be got and replaced.
+ * The {@code row} should not be {@code null}.
+ *
+ * @param row Binary row.
*/
- public InsertCommand(@NotNull BinaryRow row) {
+ public GetAndReplaceCommand(@NotNull BinaryRow row) {
assert row != null;
this.row = row;
- rowToBytes(row, bytes -> rowBytes = bytes);
- }
-
- /**
- * Writes a row to byte array.
- *
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
}
/**
- * Gets a data row.
+ * Gets a binary row to be got and replaced.
*
- * @return Data row.
+ * @return Binary row.
*/
public BinaryRow getRow() {
if (row == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndUpsertCommand.java
similarity index 55%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndUpsertCommand.java
index eabe187..55572f6 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/DeleteCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetAndUpsertCommand.java
@@ -17,68 +17,43 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
/**
- * The command deletes a entry by passed key.
+ * This is a command to get a value before upsert it.
*/
-public class DeleteCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(DeleteCommand.class);
-
- /** Key row. */
+public class GetAndUpsertCommand implements WriteCommand {
+ /** Binary key row. */
private transient BinaryRow keyRow;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] keyRowBytes;
/**
- * @param keyRow Key row.
- */
- public DeleteCommand(@NotNull BinaryRow keyRow) {
- assert keyRow != null;
-
- this.keyRow = keyRow;
-
- rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
- }
-
- /**
- * Writes a row to byte array.
+ * Creates a new instance of GetAndUpsertCommand with the given row to be got and upserted.
+ * The {@code row} should not be {@code null}.
*
- * @param row Row.
- * @param consumer Byte array consumer.
+ * @param row Binary row.
*/
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
+ public GetAndUpsertCommand(@NotNull BinaryRow row) {
+ assert row != null;
- baos.flush();
+ this.keyRow = row;
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(row, bytes -> keyRowBytes = bytes);
}
/**
- * Gets a key row.
+ * Gets a binary key row to be got and upserted.
*
- * @return Key row.
+ * @return Binary key.
*/
public BinaryRow getKeyRow() {
if (keyRow == null)
@@ -86,5 +61,4 @@ public class DeleteCommand implements WriteCommand {
return keyRow;
}
-
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java
index 597aba3..286257b 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/GetCommand.java
@@ -17,12 +17,8 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.ReadCommand;
import org.jetbrains.annotations.NotNull;
@@ -30,55 +26,34 @@ import org.jetbrains.annotations.NotNull;
* The command gets a value by key specified.
*/
public class GetCommand implements ReadCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
- /** Key row. */
+ /** Binary key row. */
private transient BinaryRow keyRow;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] keyRowBytes;
/**
- * @param keyRow Key row.
+ * Creates a new instance of GetCommand with the given key to be got.
+ * The {@code keyRow} should not be {@code null}.
+ *
+ * @param keyRow Binary key row.
*/
public GetCommand(@NotNull BinaryRow keyRow) {
assert keyRow != null;
this.keyRow = keyRow;
- rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
- }
-
- /**
- * Writes a row to byte array.
- *
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(keyRow, bytes -> keyRowBytes = bytes);
}
/**
- * Gets a key row.
+ * Gets a binary key row to be got.
*
- * @return Key row.
+ * @return Binary key.
*/
public BinaryRow getKeyRow() {
if (keyRow == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertAllCommand.java
new file mode 100644
index 0000000..1dbad79
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command inserts a batch rows.
+ */
+public class InsertAllCommand implements WriteCommand {
+ /** Binary rows. */
+ private transient Set<BinaryRow> rows;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after (IGNITE-14793).
+ */
+ private byte[] rowsBytes;
+
+ /**
+ * Creates a new instance of InsertAllCommand with the given rows to be inserted.
+ * The {@code rows} should not be {@code null} or empty.
+ *
+ * @param rows Binary rows.
+ */
+ public InsertAllCommand(@NotNull Set<BinaryRow> rows) {
+ assert rows != null && !rows.isEmpty();
+
+ this.rows = rows;
+
+ CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+ }
+
+ /**
+ * Gets a set of binary rows to be inserted.
+ *
+ * @return Binary rows.
+ */
+ public Set<BinaryRow> getRows() {
+ if (rows == null && rowsBytes != null) {
+ rows = new HashSet<>();
+
+ CommandUtils.readRows(rowsBytes, rows::add);
+ }
+
+ return rows;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
index be6ddcc..b12e467 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
@@ -17,12 +17,8 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -30,55 +26,34 @@ import org.jetbrains.annotations.NotNull;
* The command inserts a row.
*/
public class InsertCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
- /** Row. */
+ /** Binary row. */
private transient BinaryRow row;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] rowBytes;
/**
- * @param row Row.
+ * Creates a new instance of InsertCommand with the given row to be inserted.
+ * The {@code row} should not be {@code null}.
+ *
+ * @param row Binary row.
*/
public InsertCommand(@NotNull BinaryRow row) {
assert row != null;
this.row = row;
- rowToBytes(row, bytes -> rowBytes = bytes);
- }
-
- /**
- * Writes a row to byte array.
- *
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
}
/**
- * Gets a data row.
+ * Gets a binary row to be inserted.
*
- * @return Data row.
+ * @return Binary row.
*/
public BinaryRow getRow() {
if (row == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
index 03c83a2..298484f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceCommand.java
@@ -17,12 +17,8 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -30,31 +26,31 @@ import org.jetbrains.annotations.NotNull;
* The command replaces an old entry to a new one.
*/
public class ReplaceCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(ReplaceCommand.class);
-
- /** Row. */
+ /** Replacing binary row. */
private transient BinaryRow row;
- /** Old row. */
+ /** Replaced binary row. */
private transient BinaryRow oldRow;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] rowBytes;
/**
* Old row bytes.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] oldRowBytes;
/**
- * @param oldRow Old row.
- * @param row Row.
+ * Creates a new instance of ReplaceCommand with the given two rows to be replaced each other.
+ * Both rows should not be {@code null}.
+ *
+ * @param oldRow Old Binary row.
+ * @param row Binary row.
*/
public ReplaceCommand(@NotNull BinaryRow oldRow, @NotNull BinaryRow row) {
assert oldRow != null;
@@ -63,35 +59,14 @@ public class ReplaceCommand implements WriteCommand {
this.oldRow = oldRow;
this.row = row;
- rowToBytes(oldRow, bytes -> oldRowBytes = bytes);
- rowToBytes(row, bytes -> rowBytes = bytes);
- }
-
- /**
- * Writes a row to byte array.
- *
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(oldRow, bytes -> oldRowBytes = bytes);
+ CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
}
/**
- * Gets a data row.
+ * Gets a binary row which will be after replace.
*
- * @return Data row.
+ * @return Binary row.
*/
public BinaryRow getRow() {
if (row == null)
@@ -101,9 +76,9 @@ public class ReplaceCommand implements WriteCommand {
}
/**
- * Gets an old row.
+ * Gets a binary row which should be before replace.
*
- * @return Data row.
+ * @return Binary row.
*/
public BinaryRow getOldRow() {
if (oldRow == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceIfExistCommand.java
similarity index 57%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceIfExistCommand.java
index be6ddcc..4f20ced 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/InsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/ReplaceIfExistCommand.java
@@ -17,68 +17,43 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
/**
- * The command inserts a row.
+ * The command replaces an old entry to a new one.
*/
-public class InsertCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
- /** Row. */
+public class ReplaceIfExistCommand implements WriteCommand {
+ /** Binary row. */
private transient BinaryRow row;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] rowBytes;
/**
- * @param row Row.
+ * Creates a new instance of ReplaceIfExistCommand with the given row to be replaced.
+ * The {@code row} should not be {@code null}.
+ *
+ * @param row Binary row.
*/
- public InsertCommand(@NotNull BinaryRow row) {
+ public ReplaceIfExistCommand(@NotNull BinaryRow row) {
assert row != null;
this.row = row;
- rowToBytes(row, bytes -> rowBytes = bytes);
- }
-
- /**
- * Writes a row to byte array.
- *
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
}
/**
- * Gets a data row.
+ * Gets a binary row to be replaced.
*
- * @return Data row.
+ * @return Binary row.
*/
public BinaryRow getRow() {
if (row == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertAllCommand.java
new file mode 100644
index 0000000..4f6ed5f
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertAllCommand.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command;
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.raft.client.WriteCommand;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * The command puts a batch rows.
+ */
+public class UpsertAllCommand implements WriteCommand {
+ /** Binary rows. */
+ private transient Set<BinaryRow> rows;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after (IGNITE-14793).
+ */
+ private byte[] rowsBytes;
+
+ /**
+ * Creates a new instance of UpsertAllCommand with the given rows to be upserted.
+ * The {@code rows} should not be {@code null} or empty.
+ *
+ * @param rows Binary rows.
+ */
+ public UpsertAllCommand(@NotNull Set<BinaryRow> rows) {
+ assert rows != null && !rows.isEmpty();
+
+ this.rows = rows;
+
+ CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+ }
+
+ /**
+ * Gets a set of binary rows to be upserted.
+ *
+ * @return Binary rows.
+ */
+ public Set<BinaryRow> getRows() {
+ if (rows == null && rowsBytes != null) {
+ rows = new HashSet<>();
+
+ CommandUtils.readRows(rowsBytes, rows::add);
+ }
+
+ return rows;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java
index cc46aca..9f2e1ef 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpsertCommand.java
@@ -17,12 +17,8 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.lang.IgniteLogger;
import org.apache.ignite.raft.client.WriteCommand;
import org.jetbrains.annotations.NotNull;
@@ -30,55 +26,34 @@ import org.jetbrains.annotations.NotNull;
* The command inserts or updates a value for the key specified.
*/
public class UpsertCommand implements WriteCommand {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(UpsertCommand.class);
-
- /** Row. */
+ /** Binary row. */
private transient BinaryRow row;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] rowBytes;
/**
- * @param row Row.
+ * Creates a new instance of UpsertCommand with the given row to be upserted.
+ * The {@code row} should not be {@code null}.
+ *
+ * @param row Binary row.
*/
public UpsertCommand(@NotNull BinaryRow row) {
assert row != null;
this.row = row;
- rowToBytes(row, bytes -> rowBytes = bytes);
- }
-
- /**
- * Writes a row to byte array.
- *
- * @param row Row.
- * @param consumer Byte array consumer.
- */
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
-
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
}
/**
- * Gets a data row.
+ * Gets a binary row to be upserted.
*
- * @return Data row.
+ * @return Binary row.
*/
public BinaryRow getRow() {
if (row == null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
new file mode 100644
index 0000000..7c9d969
--- /dev/null
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.command.response;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashSet;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+
+/**
+ * This class represents a response object that contains a collection {@link BinaryRow} from a batch operation.
+ * @see GetAllCommand
+ * @see DeleteAllCommand
+ * @see InsertAllCommand
+ * @see DeleteExactAllCommand
+ */
+public class MultiRowsResponse implements Serializable {
+ /** Binary rows. */
+ private transient Collection<BinaryRow> rows;
+
+ /*
+ * Row bytes.
+ * It is a temporary solution, before network have not implement correct serialization BinaryRow.
+ * TODO: Remove the field after (IGNITE-14793).
+ */
+ private byte[] rowsBytes;
+
+ /**
+ * Creates a new instance of MultiRowsResponse with the given collection of binary rows.
+ *
+ * @param rows Collection of binary rows.
+ */
+ public MultiRowsResponse(Collection<BinaryRow> rows) {
+ this.rows = rows;
+
+ CommandUtils.rowsToBytes(rows, bytes -> rowsBytes = bytes);
+ }
+
+ /**
+ * @return Binary rows.
+ */
+ public Collection<BinaryRow> getValues() {
+ if (rows == null && rowsBytes != null) {
+ rows = new HashSet<>();
+
+ CommandUtils.readRows(rowsBytes, rows::add);
+ }
+
+ return rows;
+ }
+}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
similarity index 56%
rename from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java
rename to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
index 803cee1..4465fac 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/KVGetResponse.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
@@ -17,67 +17,46 @@
package org.apache.ignite.internal.table.distributed.command.response;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
import java.io.Serializable;
-import java.util.function.Consumer;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.table.distributed.command.CommandUtils;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
-import org.apache.ignite.lang.IgniteLogger;
/**
- * It is a response object for handling a table get command.
+ * This class represents a response object message that contains a single {@link BinaryRow}.
+ * @see GetCommand
+ * @see GetAndDeleteCommand
+ * @see GetAndUpsertCommand
+ * @see GetAndReplaceCommand
*/
-public class KVGetResponse implements Serializable {
- /** The logger. */
- private static final IgniteLogger LOG = IgniteLogger.forClass(GetCommand.class);
-
- /** Row. */
+public class SingleRowResponse implements Serializable {
+ /** Binary row. */
private transient BinaryRow row;
/*
* Row bytes.
* It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after.
+ * TODO: Remove the field after (IGNITE-14793).
*/
private byte[] rowBytes;
- public KVGetResponse(BinaryRow row) {
- this.row = row;
-
- rowToBytes(row, bytes -> rowBytes = bytes);
- }
-
/**
- * Writes a row to byte array.
+ * Creates a new instance of SingleRowResponse with the given binary row.
*
- * @param row Row.
- * @param consumer Byte array consumer.
+ * @param row Binary row.
*/
- private void rowToBytes(BinaryRow row, Consumer<byte[]> consumer) {
- if (row == null) {
- consumer.accept(null);
-
- return;
- }
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- consumer.accept(baos.toByteArray());
- }
- catch (IOException e) {
- LOG.error("Could not write row to stream [row=" + row + ']', e);
+ public SingleRowResponse(BinaryRow row) {
+ this.row = row;
- consumer.accept(null);
- }
+ CommandUtils.rowToBytes(row, bytes -> rowBytes = bytes);
}
/**
- * @return Data row.
+ * @return Binary row.
*/
public BinaryRow getValue() {
if (row == null && rowBytes != null)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 5f49636..34141e8 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -19,15 +19,29 @@ package org.apache.ignite.internal.table.distributed.raft;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
-import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
import org.apache.ignite.raft.client.ReadCommand;
import org.apache.ignite.raft.client.WriteCommand;
import org.apache.ignite.raft.client.service.CommandClosure;
@@ -38,7 +52,11 @@ import org.jetbrains.annotations.NotNull;
* Partition command handler.
*/
public class PartitionListener implements RaftGroupListener {
- /** Storage. */
+ /**
+ * Storage.
+ * This is a temporary solution, it will apply until persistence layer would not be implemented.
+ * TODO: IGNITE-14790.
+ */
private ConcurrentHashMap<KeyWrapper, BinaryRow> storage = new ConcurrentHashMap<>();
/** {@inheritDoc} */
@@ -46,9 +64,27 @@ public class PartitionListener implements RaftGroupListener {
while (iterator.hasNext()) {
CommandClosure<ReadCommand> clo = iterator.next();
- assert clo.command() instanceof GetCommand;
+ if (clo.command() instanceof GetCommand) {
+ clo.result(new SingleRowResponse(storage.get(
+ extractAndWrapKey(((GetCommand)clo.command()).getKeyRow())
+ )));
+ }
+ else if (clo.command() instanceof GetAllCommand) {
+ Set<BinaryRow> keyRows = ((GetAllCommand)clo.command()).getKeyRows();
+
+ assert keyRows != null && !keyRows.isEmpty();
- clo.result(new KVGetResponse(storage.get(extractAndWrapKey(((GetCommand)clo.command()).getKeyRow()))));
+ final Set<BinaryRow> res = keyRows.stream()
+ .map(this::extractAndWrapKey)
+ .map(storage::get)
+ .filter(Objects::nonNull)
+ .filter(BinaryRow::hasValue)
+ .collect(Collectors.toSet());
+
+ clo.result(new MultiRowsResponse(res));
+ }
+ else
+ assert false : "Command was not found [cmd=" + clo.command() + ']';
}
}
@@ -58,10 +94,11 @@ public class PartitionListener implements RaftGroupListener {
CommandClosure<WriteCommand> clo = iterator.next();
if (clo.command() instanceof InsertCommand) {
- BinaryRow previous = storage.putIfAbsent(
- extractAndWrapKey(((InsertCommand)clo.command()).getRow()),
- ((InsertCommand)clo.command()).getRow()
- );
+ BinaryRow row = ((InsertCommand)clo.command()).getRow();
+
+ assert row.hasValue() : "Insert command should have a value.";
+
+ BinaryRow previous = storage.putIfAbsent(extractAndWrapKey(row), row);
clo.result(previous == null);
}
@@ -91,13 +128,140 @@ public class PartitionListener implements RaftGroupListener {
clo.result(false);
}
else if (clo.command() instanceof UpsertCommand) {
- storage.put(
- extractAndWrapKey(((UpsertCommand)clo.command()).getRow()),
- ((UpsertCommand)clo.command()).getRow()
- );
+ BinaryRow row = ((UpsertCommand)clo.command()).getRow();
+
+ assert row.hasValue() : "Upsert command should have a value.";
+
+ storage.put(extractAndWrapKey(row), row);
clo.result(null);
}
+ else if (clo.command() instanceof InsertAllCommand) {
+ Set<BinaryRow> rows = ((InsertAllCommand)clo.command()).getRows();
+
+ assert rows != null && !rows.isEmpty();
+
+ final Set<BinaryRow> res = rows.stream()
+ .map(k -> storage.putIfAbsent(extractAndWrapKey(k), k) == null ? null : k)
+ .filter(Objects::nonNull)
+ .filter(BinaryRow::hasValue)
+ .collect(Collectors.toSet());
+
+ clo.result(new MultiRowsResponse(res));
+ }
+ else if (clo.command() instanceof UpsertAllCommand) {
+ Set<BinaryRow> rows = ((UpsertAllCommand)clo.command()).getRows();
+
+ assert rows != null && !rows.isEmpty();
+
+ rows.forEach(k -> storage.put(extractAndWrapKey(k), k));
+
+ clo.result(null);
+ }
+ else if (clo.command() instanceof DeleteAllCommand) {
+ Set<BinaryRow> rows = ((DeleteAllCommand)clo.command()).getRows();
+
+ assert rows != null && !rows.isEmpty();
+
+ final Set<BinaryRow> res = rows.stream()
+ .map(k -> {
+ if (k.hasValue())
+ return null;
+ else
+ return storage.remove(extractAndWrapKey(k));
+ })
+ .filter(Objects::nonNull)
+ .filter(BinaryRow::hasValue)
+ .collect(Collectors.toSet());
+
+ clo.result(new MultiRowsResponse(res));
+ }
+ else if (clo.command() instanceof DeleteExactCommand) {
+ BinaryRow row = ((DeleteExactCommand)clo.command()).getRow();
+
+ assert row != null;
+ assert row.hasValue();
+
+ final KeyWrapper key = extractAndWrapKey(row);
+ final BinaryRow old = storage.get(key);
+
+ if (old == null || !old.hasValue())
+ clo.result(false);
+ else
+ clo.result(equalValues(row, old) && storage.remove(key) != null);
+ }
+ else if (clo.command() instanceof DeleteExactAllCommand) {
+ Set<BinaryRow> rows = ((DeleteExactAllCommand)clo.command()).getRows();
+
+ assert rows != null && !rows.isEmpty();
+
+ final Set<BinaryRow> res = rows.stream()
+ .map(k -> {
+ final KeyWrapper key = extractAndWrapKey(k);
+ final BinaryRow old = storage.get(key);
+
+ if (old == null || !old.hasValue() || !equalValues(k, old))
+ return null;
+
+ return storage.remove(key);
+ })
+ .filter(Objects::nonNull)
+ .filter(BinaryRow::hasValue)
+ .collect(Collectors.toSet());
+
+ clo.result(new MultiRowsResponse(res));
+ }
+ else if (clo.command() instanceof ReplaceIfExistCommand) {
+ BinaryRow row = ((ReplaceIfExistCommand)clo.command()).getRow();
+
+ assert row != null;
+
+ final KeyWrapper key = extractAndWrapKey(row);
+ final BinaryRow oldRow = storage.get(key);
+
+ if (oldRow == null || !oldRow.hasValue())
+ clo.result(false);
+ else
+ clo.result(storage.put(key, row) == oldRow);
+ }
+ else if (clo.command() instanceof GetAndDeleteCommand) {
+ BinaryRow row = ((GetAndDeleteCommand)clo.command()).getKeyRow();
+
+ assert row != null;
+
+ BinaryRow oldRow = storage.remove(extractAndWrapKey(row));
+
+ if (oldRow == null || !oldRow.hasValue())
+ clo.result(new SingleRowResponse(null));
+ else
+ clo.result(new SingleRowResponse(oldRow));
+ }
+ else if (clo.command() instanceof GetAndReplaceCommand) {
+ BinaryRow row = ((GetAndReplaceCommand)clo.command()).getRow();
+
+ assert row != null && row.hasValue();
+
+ BinaryRow oldRow = storage.get(extractAndWrapKey(row));
+
+ storage.computeIfPresent(extractAndWrapKey(row), (key, val) -> row);
+
+ if (oldRow == null || !oldRow.hasValue())
+ clo.result(new SingleRowResponse(null));
+ else
+ clo.result(new SingleRowResponse(oldRow));
+ }
+ else if (clo.command() instanceof GetAndUpsertCommand) {
+ BinaryRow row = ((GetAndUpsertCommand)clo.command()).getKeyRow();
+
+ assert row != null && row.hasValue();
+
+ BinaryRow oldRow = storage.put(extractAndWrapKey(row), row);
+
+ if (oldRow == null || !oldRow.hasValue())
+ clo.result(new SingleRowResponse(null));
+ else
+ clo.result(new SingleRowResponse(oldRow));
+ }
else
assert false : "Command was not found [cmd=" + clo.command() + ']';
}
@@ -115,28 +279,6 @@ public class PartitionListener implements RaftGroupListener {
}
/**
- * @param row Row.
- * @return Extracted key.
- */
- @NotNull private boolean equalValues(@NotNull BinaryRow row, @NotNull BinaryRow row2) {
- if (row.hasValue() ^ row2.hasValue())
- return false;
-
- return row.valueSlice().compareTo(row2.valueSlice()) == 0;
- }
-
- /**
- * @param row Row.
- * @return Extracted key.
- */
- @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
- final byte[] bytes = new byte[row.keySlice().capacity()];
- row.keySlice().get(bytes);
-
- return new KeyWrapper(bytes, row.hash());
- }
-
- /**
* Wrapper provides correct byte[] comparison.
*/
private static class KeyWrapper {
@@ -175,4 +317,37 @@ public class PartitionListener implements RaftGroupListener {
return hash;
}
}
+
+ /**
+ * Compares two rows.
+ *
+ * @param row Row to compare.
+ * @param row2 Row to compare.
+ * @return True if these rows is equivalent, false otherwise.
+ */
+ private boolean equalValues(BinaryRow row, BinaryRow row2) {
+ if (row == row2)
+ return true;
+
+ if (row == null || row2 == null)
+ return false;
+
+ if (row.hasValue() ^ row2.hasValue())
+ return false;
+
+ return row.valueSlice().compareTo(row2.valueSlice()) == 0;
+ }
+
+ /**
+ * Makes a wrapped key from a table row.
+ *
+ * @param row Row.
+ * @return Extracted key.
+ */
+ @NotNull private KeyWrapper extractAndWrapKey(@NotNull BinaryRow row) {
+ final byte[] bytes = new byte[row.keySlice().capacity()];
+ row.keySlice().get(bytes);
+
+ return new KeyWrapper(bytes, row.hash());
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index ce67913..8b5213c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -17,18 +17,32 @@
package org.apache.ignite.internal.table.distributed.storage;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.InsertCommand;
import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
-import org.apache.ignite.internal.table.distributed.command.response.KVGetResponse;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
import org.apache.ignite.raft.client.service.RaftGroupService;
import org.jetbrains.annotations.NotNull;
@@ -71,13 +85,35 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<BinaryRow> get(BinaryRow keyRow) {
- return partitionMap.get(partId(keyRow)).<KVGetResponse>run(new GetCommand(keyRow))
- .thenApply(KVGetResponse::getValue);
+ return partitionMap.get(partId(keyRow)).<SingleRowResponse>run(new GetCommand(keyRow))
+ .thenApply(SingleRowResponse::getValue);
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<BinaryRow>> getAll(Collection<BinaryRow> keyRows) {
- return null;
+ HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+ for (BinaryRow keyRow : keyRows) {
+ keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+ .add(keyRow);
+ }
+
+ CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+ int batchNum = 0;
+
+ for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+ futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new GetAllCommand(partToRows.getValue()));
+
+ batchNum++;
+ }
+
+ return CompletableFuture.allOf(futures)
+ .thenApply(response -> Arrays.stream(futures)
+ .map(CompletableFuture::join)
+ .map(MultiRowsResponse::getValues)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
}
/** {@inheritDoc} */
@@ -87,12 +123,30 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Void> upsertAll(Collection<BinaryRow> rows) {
- return null;
+ HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+ for (BinaryRow keyRow : rows) {
+ keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+ .add(keyRow);
+ }
+
+ CompletableFuture<Void>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+ int batchNum = 0;
+
+ for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+ futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new UpsertAllCommand(partToRows.getValue()));
+
+ batchNum++;
+ }
+
+ return CompletableFuture.allOf(futures);
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<BinaryRow> getAndUpsert(BinaryRow row) {
- return null;
+ return partitionMap.get(partId(row)).<SingleRowResponse>run(new GetAndUpsertCommand(row))
+ .thenApply(SingleRowResponse::getValue);
}
/** {@inheritDoc} */
@@ -102,12 +156,34 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<BinaryRow>> insertAll(Collection<BinaryRow> rows) {
- return null;
+ HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+ for (BinaryRow keyRow : rows) {
+ keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+ .add(keyRow);
+ }
+
+ CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+ int batchNum = 0;
+
+ for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+ futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new InsertAllCommand(partToRows.getValue()));
+
+ batchNum++;
+ }
+
+ return CompletableFuture.allOf(futures)
+ .thenApply(response -> Arrays.stream(futures)
+ .map(CompletableFuture::join)
+ .map(MultiRowsResponse::getValues)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> replace(BinaryRow row) {
- return null;
+ return partitionMap.get(partId(row)).<Boolean>run(new ReplaceIfExistCommand(row));
}
/** {@inheritDoc} */
@@ -117,7 +193,8 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<BinaryRow> getAndReplace(BinaryRow row) {
- return null;
+ return partitionMap.get(partId(row)).<SingleRowResponse>run(new ReplaceIfExistCommand(row))
+ .thenApply(SingleRowResponse::getValue);
}
/** {@inheritDoc} */
@@ -127,22 +204,67 @@ public class InternalTableImpl implements InternalTable {
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Boolean> deleteExact(BinaryRow oldRow) {
- return null;
+ return partitionMap.get(partId(oldRow)).<Boolean>run(new DeleteExactCommand(oldRow));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<BinaryRow> getAndDelete(BinaryRow row) {
- return null;
+ return partitionMap.get(partId(row)).<SingleRowResponse>run(new GetAndDeleteCommand(row))
+ .thenApply(SingleRowResponse::getValue);
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<BinaryRow>> deleteAll(Collection<BinaryRow> rows) {
- return null;
+ HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+ for (BinaryRow keyRow : rows) {
+ keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+ .add(keyRow);
+ }
+
+ CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+ int batchNum = 0;
+
+ for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+ futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new DeleteAllCommand(partToRows.getValue()));
+
+ batchNum++;
+ }
+
+ return CompletableFuture.allOf(futures)
+ .thenApply(response -> Arrays.stream(futures)
+ .map(CompletableFuture::join)
+ .map(MultiRowsResponse::getValues)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
}
/** {@inheritDoc} */
@Override public @NotNull CompletableFuture<Collection<BinaryRow>> deleteAllExact(Collection<BinaryRow> rows) {
- return null;
+ HashMap<Integer, HashSet<BinaryRow>> keyRowsByPartition = new HashMap<>();
+
+ for (BinaryRow keyRow : rows) {
+ keyRowsByPartition.computeIfAbsent(partId(keyRow), HashSet::new)
+ .add(keyRow);
+ }
+
+ CompletableFuture<MultiRowsResponse>[] futures = new CompletableFuture[keyRowsByPartition.size()];
+
+ int batchNum = 0;
+
+ for (Map.Entry<Integer, HashSet<BinaryRow>> partToRows : keyRowsByPartition.entrySet()) {
+ futures[batchNum] = partitionMap.get(partToRows.getKey()).run(new DeleteExactAllCommand(partToRows.getValue()));
+
+ batchNum++;
+ }
+
+ return CompletableFuture.allOf(futures)
+ .thenApply(response -> Arrays.stream(futures)
+ .map(CompletableFuture::join)
+ .map(MultiRowsResponse::getValues)
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
new file mode 100644
index 0000000..8621d20
--- /dev/null
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -0,0 +1,700 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft;
+
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.UUID;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.NativeTypes;
+import org.apache.ignite.internal.schema.Row;
+import org.apache.ignite.internal.schema.RowAssembler;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.table.distributed.command.DeleteAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactAllCommand;
+import org.apache.ignite.internal.table.distributed.command.DeleteExactCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAllCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndDeleteCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.GetAndUpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.GetCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.InsertCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceCommand;
+import org.apache.ignite.internal.table.distributed.command.ReplaceIfExistCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertAllCommand;
+import org.apache.ignite.internal.table.distributed.command.UpsertCommand;
+import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
+import org.apache.ignite.internal.table.distributed.command.response.SingleRowResponse;
+import org.apache.ignite.raft.client.Command;
+import org.apache.ignite.raft.client.service.CommandClosure;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * There are a tests for a table command listener.
+ * All rows should be removed before returning form each test.
+ */
+public class PartitionCommandListenerTest {
+ /** Key count. */
+ public static final int KEY_COUNT = 100;
+
+ /** Schema. */
+ public static SchemaDescriptor SCHEMA = new SchemaDescriptor(UUID.randomUUID(),
+ 1,
+ new Column[] {new Column("key", NativeTypes.INTEGER, false)},
+ new Column[] {new Column("value", NativeTypes.INTEGER, false)}
+ );
+
+ /** Table command listener. */
+ private static PartitionListener commandListener;
+
+ /**
+ * Initializes a table listener before tests.
+ */
+ @BeforeAll
+ public static void before() {
+ commandListener = new PartitionListener();
+ }
+
+ /**
+ * Inserts rows and checks them.
+ * All rows are removed before returning.
+ */
+ @Test
+ public void testInsertCommands() {
+ readAndCheck(false);
+
+ delete(false);
+
+ insert(false);
+
+ insert(true);
+
+ readAndCheck(true);
+
+ delete(true);
+ }
+
+ /**
+ * Upserts rows and checks them.
+ * All rows are removed before returning.
+ */
+ @Test
+ public void testUpsertValues() {
+ readAndCheck(false);
+
+ upsert();
+
+ readAndCheck(true);
+
+ delete(true);
+
+ readAndCheck(false);
+ }
+
+ /**
+ * Adds rows, replaces and checks them.
+ * All rows are removed before returning.
+ */
+ @Test
+ public void testReplaceCommand() {
+ upsert();
+
+ deleteExactValues(false);
+
+ replaceValues(true);
+
+ readAndCheck(true, i -> i + 1);
+
+ replaceValues(false);
+
+ readAndCheck(true, i -> i + 1);
+
+ deleteExactValues(true);
+
+ readAndCheck(false);
+ }
+
+ /**
+ * The test checks PutIfExist command.
+ * All rows are removed before returning.
+ */
+ @Test
+ public void testPutIfExistCommand() {
+ putIfExistValues(false);
+
+ readAndCheck(false);
+
+ upsert();
+
+ putIfExistValues(true);
+
+ readAndCheck(true, i -> i + 1);
+
+ getAndDeleteValues(true);
+
+ readAndCheck(false);
+
+ getAndDeleteValues(false);
+ }
+
+ /**
+ * The test checks GetAndReplace command.
+ * All rows are removed before returning.
+ */
+ @Test
+ public void testGetAndReplaceCommand() {
+ readAndCheck(false);
+
+ getAndUpsertValues(false);
+
+ readAndCheck(true);
+
+ getAndReplaceValues(true);
+
+ readAndCheck(true, i -> i + 1);
+
+ getAndUpsertValues(true);
+
+ readAndCheck(true);
+
+ deleteExactAllValues(true);
+
+ readAndCheck(false);
+
+ getAndReplaceValues(false);
+
+ deleteExactAllValues(false);
+ }
+
+ /**
+ * The test checks a batch upsert command.
+ * All rows are removed before returning.
+ */
+ @Test
+ public void testUpsertRowsBatchedAndCheck() {
+ readAll(false);
+
+ deleteAll(false);
+
+ upsertAll();
+
+ readAll(true);
+
+ deleteAll(true);
+
+ readAll(false);
+ }
+
+ /**
+ * The test checks a batch insert command.
+ * All rows are removed before returning.
+ */
+ @Test
+ public void testInsertRowsBatchedAndCheck() {
+ readAll(false);
+
+ deleteAll(false);
+
+ insertAll(false);
+
+ readAll(true);
+
+ insertAll(true);
+
+ deleteAll(true);
+
+ readAll(false);
+ }
+
+ /**
+ * Prepares a closure iterator for a specific batch operation.
+ *
+ * @param func The function prepare a closure for the operation.
+ * @param <T> Type of the operation.
+ * @return Closure iterator.
+ */
+ private <T extends Command> Iterator<CommandClosure<T>> batchIterator(Consumer<CommandClosure<T>> func) {
+ return new Iterator<CommandClosure<T>>() {
+ boolean moved;
+
+ @Override public boolean hasNext() {
+ return !moved;
+ }
+
+ @Override public CommandClosure<T> next() {
+ CommandClosure<T> clo = mock(CommandClosure.class);
+
+ func.accept(clo);
+
+ moved = true;
+
+ return clo;
+ }
+ };
+ }
+
+ /**
+ * Prepares a closure iterator for a specific operation.
+ *
+ * @param func The function prepare a closure for the operation.
+ * @param <T> Type of the operation.
+ * @return Closure iterator.
+ */
+ private <T extends Command> Iterator<CommandClosure<T>> iterator(BiConsumer<Integer, CommandClosure<T>> func) {
+ return new Iterator<CommandClosure<T>>() {
+ /** Iteration. */
+ private int i = 0;
+
+ /** {@inheritDoc} */
+ @Override public boolean hasNext() {
+ return i < KEY_COUNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override public CommandClosure<T> next() {
+ CommandClosure<T> clo = mock(CommandClosure.class);
+
+ func.accept(i, clo);
+
+ i++;
+
+ return clo;
+ }
+ };
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void insertAll(boolean existed) {
+ commandListener.onWrite(batchIterator(clo -> {
+ doAnswer(invocation -> {
+ MultiRowsResponse resp = invocation.getArgument(0);
+
+ if (existed) {
+ assertEquals(KEY_COUNT, resp.getValues().size());
+
+ for (BinaryRow binaryRow : resp.getValues()) {
+ Row row = new Row(SCHEMA, binaryRow);
+
+ int keyVal = row.intValue(0);
+
+ assertTrue(keyVal < KEY_COUNT);
+ assertEquals(keyVal, row.intValue(1));
+ }
+ }
+ else
+ assertTrue(resp.getValues().isEmpty());
+
+ return null;
+ }).when(clo).result(any(MultiRowsResponse.class));
+
+ Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+ for (int i = 0; i < KEY_COUNT; i++)
+ rows.add(getTestRow(i, i));
+
+ when(clo.command()).thenReturn(new InsertAllCommand(rows));
+ }));
+ }
+
+ /**
+ * Upserts values from the listener in the batch operation.
+ */
+ private void upsertAll() {
+ commandListener.onWrite(batchIterator(clo -> {
+ doAnswer(invocation -> {
+ assertNull(invocation.getArgument(0));
+
+ return null;
+ }).when(clo).result(any());
+
+ Set<BinaryRow> rows = new HashSet<>(KEY_COUNT);
+
+ for (int i = 0; i < KEY_COUNT; i++)
+ rows.add(getTestRow(i, i));
+
+ when(clo.command()).thenReturn(new UpsertAllCommand(rows));
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void deleteAll(boolean existed) {
+ commandListener.onWrite(batchIterator(clo -> {
+ doAnswer(invocation -> {
+ MultiRowsResponse resp = invocation.getArgument(0);
+
+ if (existed) {
+ assertEquals(KEY_COUNT, resp.getValues().size());
+
+ for (BinaryRow binaryRow : resp.getValues()) {
+ Row row = new Row(SCHEMA, binaryRow);
+
+ int keyVal = row.intValue(0);
+
+ assertTrue(keyVal < KEY_COUNT);
+ assertEquals(keyVal, row.intValue(1));
+ }
+ }
+ else
+ assertTrue(resp.getValues().isEmpty());
+
+ return null;
+ }).when(clo).result(any(MultiRowsResponse.class));
+
+ Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+ for (int i = 0; i < KEY_COUNT; i++)
+ keyRows.add(getTestKey(i));
+
+ when(clo.command()).thenReturn(new DeleteAllCommand(keyRows));
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void readAll(boolean existed) {
+ commandListener.onRead(batchIterator(clo -> {
+ doAnswer(invocation -> {
+ MultiRowsResponse resp = invocation.getArgument(0);
+
+ if (existed) {
+ assertEquals(KEY_COUNT, resp.getValues().size());
+
+ for (BinaryRow binaryRow : resp.getValues()) {
+ Row row = new Row(SCHEMA, binaryRow);
+
+ int keyVal = row.intValue(0);
+
+ assertTrue(keyVal < KEY_COUNT);
+ assertEquals(keyVal, row.intValue(1));
+ }
+ }
+ else
+ assertTrue(resp.getValues().isEmpty());
+
+ return null;
+ }).when(clo).result(any(MultiRowsResponse.class));
+
+ Set<BinaryRow> keyRows = new HashSet<>(KEY_COUNT);
+
+ for (int i = 0; i < KEY_COUNT; i++)
+ keyRows.add(getTestKey(i));
+
+ when(clo.command()).thenReturn(new GetAllCommand(keyRows));
+ }));
+ }
+
+ /**
+ * Upserts rows.
+ */
+ private void upsert() {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new UpsertCommand(getTestRow(i, i)));
+
+ doAnswer(invocation -> {
+ assertNull(invocation.getArgument(0));
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void delete(boolean existed) {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new DeleteCommand(getTestKey(i)));
+
+ doAnswer(invocation -> {
+ assertEquals(existed, invocation.getArgument(0));
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * Reads rows from the listener and checks them.
+ *
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void readAndCheck(boolean existed) {
+ readAndCheck(existed, i -> i);
+ }
+
+ /**
+ * Reads rows from the listener and checks values as expected by a mapper.
+ *
+ * @param existed True if rows are existed, false otherwise.
+ * @param keyValueMapper Mapper a key to the value which will be expected.
+ */
+ private void readAndCheck(boolean existed, Function<Integer, Integer> keyValueMapper) {
+ commandListener.onRead(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new GetCommand(getTestKey(i)));
+
+ doAnswer(invocation -> {
+ SingleRowResponse resp = invocation.getArgument(0);
+
+ if (existed) {
+ assertNotNull(resp.getValue());
+
+ assertTrue(resp.getValue().hasValue());
+
+ Row row = new Row(SCHEMA, resp.getValue());
+
+ assertEquals(i, row.intValue(0));
+ assertEquals(keyValueMapper.apply(i), row.intValue(1));
+ }
+ else
+ assertNull(resp.getValue());
+
+ return null;
+ }).when(clo).result(any(SingleRowResponse.class));
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void insert(boolean existed) {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new InsertCommand(getTestRow(i, i)));
+
+ doAnswer(mock -> {
+ assertEquals(!existed, mock.getArgument(0));
+
+ return null;
+ }).when(clo).result(!existed);
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void deleteExactAllValues(boolean existed) {
+ commandListener.onWrite(batchIterator(clo -> {
+ HashSet rows = new HashSet(KEY_COUNT);
+
+ for (int i = 0; i < KEY_COUNT; i++)
+ rows.add(getTestRow(i, i));
+
+ when(clo.command()).thenReturn(new DeleteExactAllCommand(rows));
+
+ doAnswer(invocation -> {
+ MultiRowsResponse resp = invocation.getArgument(0);
+
+ if (existed) {
+ assertEquals(KEY_COUNT, resp.getValues().size());
+
+ for (BinaryRow binaryRow : resp.getValues()) {
+ Row row = new Row(SCHEMA, binaryRow);
+
+ int keyVal = row.intValue(0);
+
+ assertTrue(keyVal < KEY_COUNT);
+
+ assertEquals(keyVal, row.intValue(1));
+ }
+ }
+ else
+ assertTrue(resp.getValues().isEmpty());
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void getAndReplaceValues(boolean existed) {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new GetAndReplaceCommand(getTestRow(i, i + 1)));
+
+ doAnswer(invocation -> {
+ SingleRowResponse resp = invocation.getArgument(0);
+
+ if (existed) {
+ Row row = new Row(SCHEMA, resp.getValue());
+
+ assertEquals(i, row.intValue(0));
+ assertEquals(i, row.intValue(1));
+ }
+ else
+ assertNull(resp.getValue());
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void getAndUpsertValues(boolean existed) {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new GetAndUpsertCommand(getTestRow(i, i)));
+
+ doAnswer(invocation -> {
+ SingleRowResponse resp = invocation.getArgument(0);
+
+ if (existed) {
+ Row row = new Row(SCHEMA, resp.getValue());
+
+ assertEquals(i, row.intValue(0));
+ assertEquals(i + 1, row.intValue(1));
+ }
+ else
+ assertNull(resp.getValue());
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void getAndDeleteValues(boolean existed) {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new GetAndDeleteCommand(getTestKey(i)));
+
+ doAnswer(invocation -> {
+ SingleRowResponse resp = invocation.getArgument(0);
+
+ if (existed) {
+
+ Row row = new Row(SCHEMA, resp.getValue());
+
+ assertEquals(i, row.intValue(0));
+ assertEquals(i + 1, row.intValue(1));
+ }
+ else
+ assertNull(resp.getValue());
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void putIfExistValues(boolean existed) {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new ReplaceIfExistCommand(getTestRow(i, i + 1)));
+
+ doAnswer(invocation -> {
+ boolean result = invocation.getArgument(0);
+
+ assertEquals(existed, result);
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void deleteExactValues(boolean existed) {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new DeleteExactCommand(getTestRow(i, i + 1)));
+
+ doAnswer(invocation -> {
+ boolean result = invocation.getArgument(0);
+
+ assertEquals(existed, result);
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * Replaces rows.
+ *
+ * @param existed True if rows are existed, false otherwise.
+ */
+ private void replaceValues(boolean existed) {
+ commandListener.onWrite(iterator((i, clo) -> {
+ when(clo.command()).thenReturn(new ReplaceCommand(getTestRow(i, i), getTestRow(i, i + 1)));
+
+ doAnswer(invocation -> {
+ assertTrue(invocation.getArgument(0) instanceof Boolean);
+
+ boolean result = invocation.getArgument(0);
+
+ assertEquals(existed, result);
+
+ return null;
+ }).when(clo).result(any());
+ }));
+ }
+
+ /**
+ * Prepares a test row which contains only key field.
+ *
+ * @return Row.
+ */
+ @NotNull private Row getTestKey(int key) {
+ RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+
+ rowBuilder.appendInt(key);
+
+ return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
+ }
+
+ /**
+ * Prepares a test row which contains key and value fields.
+ *
+ * @return Row.
+ */
+ @NotNull private Row getTestRow(int key, int val) {
+ RowAssembler rowBuilder = new RowAssembler(SCHEMA, 4096, 0, 0);
+
+ rowBuilder.appendInt(key);
+ rowBuilder.appendInt(val);
+
+ return new Row(SCHEMA, new ByteBufferRow(rowBuilder.build()));
+ }
+}