You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tk...@apache.org on 2023/08/04 13:08:02 UTC
[ignite-3] branch main updated: IGNITE-19937 Remove BinaryRow#byteBuffer method (#2408)
This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new cdddfae611 IGNITE-19937 Remove BinaryRow#byteBuffer method (#2408)
cdddfae611 is described below
commit cdddfae611f9e69d67743dcf5d3ca068a94f7479
Author: Alexander Polovtcev <al...@gmail.com>
AuthorDate: Fri Aug 4 16:07:57 2023 +0300
IGNITE-19937 Remove BinaryRow#byteBuffer method (#2408)
---
.idea/codeStyles/Project.xml | 3 +-
.../apache/ignite/internal/schema/BinaryRow.java | 16 -----
.../ignite/internal/schema/BinaryRowImpl.java | 11 +--
.../ignite/internal/schema/ByteBufferRow.java | 84 ----------------------
.../org/apache/ignite/internal/schema/row/Row.java | 5 --
.../sql/engine/exec/UpdatableTableImpl.java | 53 +++++++-------
.../exec/rel/TableScanNodeExecutionTest.java | 3 +-
.../AbstractMvPartitionStorageConcurrencyTest.java | 16 ++---
.../internal/storage/BaseMvStoragesTest.java | 5 +-
.../storage/rocksdb/PartitionDataHelper.java | 6 +-
.../ignite/distributed/ItTablePersistenceTest.java | 21 +++---
.../ignite/distributed/ReplicaUnavailableTest.java | 27 ++++---
.../ignite/internal/table/ItColocationTest.java | 13 ++--
.../table/distributed/StorageUpdateHandler.java | 16 ++---
.../table/distributed/TableMessageGroup.java | 6 ++
.../distributed/command/UpdateAllCommand.java | 4 +-
.../table/distributed/command/UpdateCommand.java | 13 +++-
.../table/distributed/raft/PartitionListener.java | 2 +-
.../snapshot/incoming/IncomingSnapshotCopier.java | 7 +-
.../snapshot/message/SnapshotMvDataResponse.java | 4 +-
.../raft/snapshot/outgoing/OutgoingSnapshot.java | 24 ++++---
...owReplicaRequest.java => BinaryRowMessage.java} | 24 +++----
.../request/MultipleRowReplicaRequest.java | 12 ++--
.../request/SingleRowReplicaRequest.java | 6 +-
.../replication/request/SwapRowReplicaRequest.java | 10 ++-
.../replicator/PartitionReplicaListener.java | 80 ++++++++++++---------
.../distributed/storage/InternalTableImpl.java | 79 ++++++++++----------
.../internal/table/distributed/IndexBaseTest.java | 32 ++++-----
.../PartitionRaftCommandsSerializationTest.java | 41 ++++++-----
.../raft/PartitionCommandListenerTest.java | 40 ++++++-----
.../incoming/IncomingSnapshotCopierTest.java | 11 ++-
.../OutgoingSnapshotMvDataStreamingTest.java | 79 +++++++++++---------
.../PartitionReplicaListenerIndexLockingTest.java | 12 +++-
.../replication/PartitionReplicaListenerTest.java | 61 ++++++++--------
34 files changed, 387 insertions(+), 439 deletions(-)
diff --git a/.idea/codeStyles/Project.xml b/.idea/codeStyles/Project.xml
index 51cbc4108f..6f8eb3a91e 100644
--- a/.idea/codeStyles/Project.xml
+++ b/.idea/codeStyles/Project.xml
@@ -166,6 +166,7 @@
<option name="BINARY_OPERATION_SIGN_ON_NEXT_LINE" value="true" />
<option name="TERNARY_OPERATION_WRAP" value="1" />
<option name="TERNARY_OPERATION_SIGNS_ON_NEXT_LINE" value="true" />
+ <option name="KEEP_SIMPLE_LAMBDAS_IN_ONE_LINE" value="true" />
<option name="FOR_STATEMENT_WRAP" value="1" />
<option name="ARRAY_INITIALIZER_WRAP" value="1" />
<option name="WRAP_COMMENTS" value="true" />
@@ -589,4 +590,4 @@
</indentOptions>
</codeStyleSettings>
</code_scheme>
-</component>
+</component>
\ No newline at end of file
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index a519a48a23..336a1abcfd 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -18,23 +18,11 @@
package org.apache.ignite.internal.schema;
import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
/**
* Binary row interface. Data layout is described in packages' {@code README.md}.
*/
public interface BinaryRow {
- ByteOrder ORDER = ByteOrder.LITTLE_ENDIAN;
-
- /** Size of schema version field. */
- int SCHEMA_VERSION_FLD_LEN = Short.BYTES;
-
- /** Row schema version field offset. */
- int SCHEMA_VERSION_OFFSET = 0;
-
- /** Row binary tuple field offset. */
- int TUPLE_OFFSET = SCHEMA_VERSION_OFFSET + SCHEMA_VERSION_FLD_LEN;
-
/** Get row schema version. */
int schemaVersion();
@@ -46,8 +34,4 @@ public interface BinaryRow {
/** Get ByteBuffer slice representing the binary tuple. */
ByteBuffer tupleSlice();
-
- /** Returns the representation of this row as a Byte Buffer. */
- // TODO: remove this method, see https://issues.apache.org/jira/browse/IGNITE-19937
- ByteBuffer byteBuffer();
}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
index c7d41c452b..ee40a0a45f 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRowImpl.java
@@ -52,16 +52,7 @@ public class BinaryRowImpl implements BinaryRow {
@Override
public ByteBuffer tupleSlice() {
- return binaryTuple.duplicate().order(ORDER);
- }
-
- @Override
- public ByteBuffer byteBuffer() {
- return ByteBuffer.allocate(tupleSliceLength() + Short.BYTES)
- .order(ORDER)
- .putShort((short) schemaVersion())
- .put(tupleSlice())
- .rewind();
+ return binaryTuple.duplicate().order(BinaryTuple.ORDER);
}
@Override
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
deleted file mode 100644
index 2eb9ab6594..0000000000
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.schema;
-
-import static org.apache.ignite.internal.binarytuple.BinaryTupleCommon.ROW_HAS_VALUE_FLAG;
-
-import java.nio.ByteBuffer;
-
-/**
- * Heap byte buffer-based row.
- */
-// TODO: remove this class, see https://issues.apache.org/jira/browse/IGNITE-19937
-public class ByteBufferRow implements BinaryRow {
- /** Row buffer. */
- private final ByteBuffer buf;
-
- /**
- * Constructor.
- *
- * @param data Array representation of the row.
- */
- public ByteBufferRow(byte[] data) {
- this(ByteBuffer.wrap(data).order(ORDER));
- }
-
- /**
- * Constructor.
- *
- * @param buf Buffer representing the row.
- */
- public ByteBufferRow(ByteBuffer buf) {
- assert buf.order() == ORDER;
- assert buf.position() == 0;
-
- this.buf = buf;
- }
-
- /** {@inheritDoc} */
- @Override
- public int schemaVersion() {
- return Short.toUnsignedInt(buf.getShort(SCHEMA_VERSION_OFFSET));
- }
-
- @Override
- public boolean hasValue() {
- return (buf.get(TUPLE_OFFSET) & ROW_HAS_VALUE_FLAG) != 0;
- }
-
- /** {@inheritDoc} */
- @Override
- public ByteBuffer tupleSlice() {
- try {
- return buf.position(TUPLE_OFFSET).slice().order(ORDER);
- } finally {
- buf.position(0); // Reset bounds.
- }
- }
-
- /** {@inheritDoc} */
- @Override
- public ByteBuffer byteBuffer() {
- return buf.duplicate().order(ORDER);
- }
-
- @Override
- public int tupleSliceLength() {
- return buf.remaining() - TUPLE_OFFSET;
- }
-}
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index e3030422af..f8562916b9 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -113,11 +113,6 @@ public class Row extends BinaryTupleReader implements BinaryRowEx, SchemaAware,
return row.tupleSlice();
}
- @Override
- public ByteBuffer byteBuffer() {
- return row.byteBuffer();
- }
-
@Override
public int tupleSliceLength() {
return row.tupleSliceLength();
diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
index 310423aca4..2d15526ef8 100644
--- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
+++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/UpdatableTableImpl.java
@@ -23,7 +23,6 @@ import static org.apache.ignite.lang.ErrorGroups.Sql.CONSTRAINT_VIOLATION_ERR;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
@@ -47,6 +46,7 @@ import org.apache.ignite.internal.sql.engine.schema.TableDescriptor;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.TypeUtils;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.sql.SqlException;
@@ -78,7 +78,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
/**
* Constructor.
*
- * @param desc Table descriptor.
+ * @param desc Table descriptor.
*/
public UpdatableTableImpl(
int tableId,
@@ -137,7 +137,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(commitPartitionId)
- .binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
+ .binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
.requestType(RequestType.RW_UPSERT_ALL)
@@ -150,11 +150,16 @@ public final class UpdatableTableImpl implements UpdatableTable {
return CompletableFuture.allOf(futures);
}
- private static List<ByteBuffer> serializeBinaryRows(Collection<BinaryRow> rows) {
- var result = new ArrayList<ByteBuffer>(rows.size());
+ private static List<BinaryRowMessage> serializeBinaryRows(Collection<BinaryRow> rows) {
+ var result = new ArrayList<BinaryRowMessage>(rows.size());
for (BinaryRow row : rows) {
- result.add(row.byteBuffer());
+ BinaryRowMessage message = MESSAGES_FACTORY.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
+
+ result.add(message);
}
return result;
@@ -197,7 +202,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(commitPartitionId)
- .binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
+ .binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
.requestType(RequestType.RW_INSERT_ALL)
@@ -205,26 +210,26 @@ public final class UpdatableTableImpl implements UpdatableTable {
.build();
futures[batchNum++] = replicaService.invoke(nodeWithTerm.name(), request)
- .thenApply(result -> {
- Collection<BinaryRow> binaryRows = (Collection<BinaryRow>) result;
+ .thenApply(result -> {
+ Collection<BinaryRow> binaryRows = (Collection<BinaryRow>) result;
- if (binaryRows.isEmpty()) {
- return List.of();
- }
+ if (binaryRows.isEmpty()) {
+ return List.of();
+ }
- List<RowT> conflictRows = new ArrayList<>(binaryRows.size());
- IgniteTypeFactory typeFactory = ectx.getTypeFactory();
- RowHandler.RowFactory<RowT> rowFactory = handler.factory(
- ectx.getTypeFactory(),
- desc.insertRowType(typeFactory)
- );
+ List<RowT> conflictRows = new ArrayList<>(binaryRows.size());
+ IgniteTypeFactory typeFactory = ectx.getTypeFactory();
+ RowHandler.RowFactory<RowT> rowFactory = handler.factory(
+ ectx.getTypeFactory(),
+ desc.insertRowType(typeFactory)
+ );
- for (BinaryRow row : binaryRows) {
- conflictRows.add(rowConverter.toRow(ectx, row, rowFactory, null));
- }
+ for (BinaryRow row : binaryRows) {
+ conflictRows.add(rowConverter.toRow(ectx, row, rowFactory, null));
+ }
- return conflictRows;
- });
+ return conflictRows;
+ });
}
return handleInsertResults(handler, futures);
@@ -260,7 +265,7 @@ public final class UpdatableTableImpl implements UpdatableTable {
ReplicaRequest request = MESSAGES_FACTORY.readWriteMultiRowReplicaRequest()
.groupId(partGroupId)
.commitPartitionId(commitPartitionId)
- .binaryRowsBytes(serializeBinaryRows(partToRows.getValue()))
+ .binaryRowMessages(serializeBinaryRows(partToRows.getValue()))
.transactionId(txAttributes.id())
.term(nodeWithTerm.term())
.requestType(RequestType.RW_DELETE_ALL)
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index da376fd68c..eca7bca58a 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -38,7 +38,6 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryTuplePrefix;
-import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.ScannableTableImpl;
@@ -133,7 +132,7 @@ public class TableScanNodeExecutionTest extends AbstractExecutionTest {
private final int dataAmount;
- private final ByteBufferRow bbRow = new ByteBufferRow(new byte[1]);
+ private final BinaryRow bbRow = mock(BinaryRow.class);
private final CopyOnWriteArraySet<Integer> partitions = new CopyOnWriteArraySet<>();
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
index 03051669e1..53c9cc25e8 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageConcurrencyTest.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.storage;
-import static java.util.stream.Collectors.toCollection;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
@@ -25,10 +24,8 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.stream.Stream;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.jetbrains.annotations.Nullable;
@@ -203,13 +200,14 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
addAndCommit.perform(this, null);
- Collection<ByteBuffer> rows = Stream.of(TABLE_ROW, TABLE_ROW2)
- .map(BinaryRow::byteBuffer)
- .collect(toCollection(ConcurrentLinkedQueue::new));
+ Collection<BinaryRow> rows = new ConcurrentLinkedQueue<>();
+
+ rows.add(TABLE_ROW);
+ rows.add(TABLE_ROW2);
runRace(
- () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(), rows),
- () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow().byteBuffer(), rows)
+ () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow(), rows),
+ () -> assertRemoveRow(pollForVacuum(HybridTimestamp.MAX_VALUE).binaryRow(), rows)
);
assertNull(pollForVacuum(HybridTimestamp.MAX_VALUE));
@@ -220,7 +218,7 @@ public abstract class AbstractMvPartitionStorageConcurrencyTest extends BaseMvPa
}
}
- private void assertRemoveRow(ByteBuffer rowBytes, Collection<ByteBuffer> rows) {
+ private static void assertRemoveRow(@Nullable BinaryRow rowBytes, Collection<BinaryRow> rows) {
assertNotNull(rowBytes);
assertTrue(rows.remove(rowBytes), rowBytes.toString());
diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
index 9a751332e5..b879bdfdb3 100644
--- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
+++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvStoragesTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
@@ -77,7 +78,9 @@ public abstract class BaseMvStoragesTest {
protected static BinaryRow binaryRow(TestKey key, TestValue value) {
try {
- return kvMarshaller.marshal(key, value);
+ Row row = kvMarshaller.marshal(key, value);
+
+ return new BinaryRowImpl(row.schemaVersion(), row.tupleSlice());
} catch (MarshallerException e) {
throw new IgniteException(e);
}
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
index 9ea674b5e8..6c24c08689 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/PartitionDataHelper.java
@@ -36,7 +36,7 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.rocksdb.RocksUtils;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowImpl;
-import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.MvPartitionStorage.WriteClosure;
import org.apache.ignite.internal.storage.RowId;
@@ -77,8 +77,6 @@ public final class PartitionDataHelper implements ManuallyCloseable {
/** Value offset (if transaction state is present). */
static final int VALUE_OFFSET = VALUE_HEADER_SIZE;
- static final ByteOrder TABLE_ROW_BYTE_ORDER = ByteBufferRow.ORDER;
-
/** Thread-local direct buffer instance to read keys from RocksDB. */
static final ThreadLocal<ByteBuffer> MV_KEY_BUFFER = withInitial(() -> allocateDirect(MAX_KEY_SIZE).order(KEY_BYTE_ORDER));
@@ -260,7 +258,7 @@ public final class PartitionDataHelper implements ManuallyCloseable {
assert buffer.order() == ByteOrder.BIG_ENDIAN;
int schemaVersion = Short.toUnsignedInt(buffer.getShort());
- ByteBuffer binaryTupleSlice = buffer.slice().order(TABLE_ROW_BYTE_ORDER);
+ ByteBuffer binaryTupleSlice = buffer.slice().order(BinaryTuple.ORDER);
return new BinaryRowImpl(schemaVersion, binaryTupleSlice);
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
index f3c99a37ab..86ca7097e1 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTablePersistenceTest.java
@@ -30,7 +30,6 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import it.unimi.dsi.fastutil.ints.Int2ObjectMaps;
-import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.LinkedList;
@@ -55,6 +54,7 @@ import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.TestReplicationGroupId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -81,6 +81,7 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
import org.apache.ignite.internal.table.distributed.raft.PartitionListener;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
@@ -226,8 +227,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
int storageIndex = stoppedNodeIndex == 0 ? 1 : 0;
MvPartitionStorage partitionStorage = mvPartitionStorages.get(storageIndex);
- Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(partitionStorage);
- RowId rowId = primaryIndex.get(req0.binaryRowBytes());
+ Map<BinaryRow, RowId> primaryIndex = rowsToRowIds(partitionStorage);
+ RowId rowId = primaryIndex.get(req0.binaryRow());
BinaryRow row = partitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
@@ -235,13 +236,13 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
}
// Non-null binary row if UPSERT, otherwise it's implied that request type is DELETE.
- ByteBuffer binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRowBytes() : null;
+ BinaryRowMessage binaryRow = req0.requestType() == RequestType.RW_UPSERT ? req0.binaryRowMessage() : null;
UpdateCommand cmd = msgFactory.updateCommand()
.txId(req0.transactionId())
.tablePartitionId(tablePartitionId(new TablePartitionId(1, 0)))
.rowUuid(new RowId(0).uuid())
- .rowBuffer(binaryRow)
+ .rowMessage(binaryRow)
.safeTimeLong(hybridClock.nowLong())
.build();
@@ -303,11 +304,11 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
MvPartitionStorage storage = getListener(restarted, raftGroupId()).getMvStorage();
return () -> {
- Map<ByteBuffer, RowId> primaryIndex = rowsToRowIds(storage);
+ Map<BinaryRow, RowId> primaryIndex = rowsToRowIds(storage);
Row value = interactedAfterSnapshot ? SECOND_VALUE : FIRST_VALUE;
- RowId rowId = primaryIndex.get(value.byteBuffer());
+ RowId rowId = primaryIndex.get(new BinaryRowImpl(value.schemaVersion(), value.tupleSlice()));
if (rowId == null) {
return false;
@@ -323,8 +324,8 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
};
}
- private static Map<ByteBuffer, RowId> rowsToRowIds(MvPartitionStorage storage) {
- Map<ByteBuffer, RowId> result = new HashMap<>();
+ private static Map<BinaryRow, RowId> rowsToRowIds(MvPartitionStorage storage) {
+ Map<BinaryRow, RowId> result = new HashMap<>();
RowId rowId = storage.closestRowId(RowId.lowestRowId(0));
@@ -332,7 +333,7 @@ public class ItTablePersistenceTest extends ItAbstractListenerSnapshotTest<Parti
BinaryRow binaryRow = storage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
if (binaryRow != null) {
- result.put(binaryRow.byteBuffer(), rowId);
+ result.put(binaryRow, rowId);
}
RowId incremented = rowId.increment();
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 51a589ae8d..e7dab29686 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -28,11 +28,11 @@ import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
import static org.apache.ignite.raft.jraft.test.TestUtils.getLocalAddress;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -51,12 +51,14 @@ import org.apache.ignite.internal.replicator.exception.ReplicationTimeoutExcepti
import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
@@ -149,7 +151,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId)
.timestampLong(clock.nowLong())
- .binaryRowBytes(createKeyValueRow(1L, 1L))
+ .binaryRowMessage(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
@@ -191,7 +193,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId)
.timestampLong(clock.nowLong())
- .binaryRowBytes(createKeyValueRow(1L, 1L))
+ .binaryRowMessage(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
@@ -223,7 +225,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId)
.timestampLong(clock.nowLong())
- .binaryRowBytes(createKeyValueRow(1L, 1L))
+ .binaryRowMessage(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
@@ -242,10 +244,10 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
e1 = e;
}
- assertTrue(e0 != null);
+ assertNotNull(e0);
assertTrue(unwrapCause(e0) instanceof ReplicationException, e0.toString());
- assertTrue(e1 != null);
+ assertNotNull(e1);
assertTrue(unwrapCause(e1) instanceof ReplicationException, e1.toString());
}
@@ -280,7 +282,7 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
.transactionId(TestTransactionIds.newTransactionId())
.commitPartitionId(tablePartitionId)
.timestampLong(clock.nowLong())
- .binaryRowBytes(createKeyValueRow(1L, 1L))
+ .binaryRowMessage(createKeyValueRow(1L, 1L))
.requestType(RequestType.RW_GET)
.build();
@@ -292,17 +294,22 @@ public class ReplicaUnavailableTest extends IgniteAbstractTest {
e0 = e;
}
- assertTrue(e0 != null);
+ assertNotNull(e0);
assertTrue(unwrapCause(e0) instanceof ReplicationTimeoutException, e0.toString());
assertEquals(REPLICA_TIMEOUT_ERR, ((ReplicationTimeoutException) unwrapCause(e0)).code());
}
- private static ByteBuffer createKeyValueRow(long id, long value) {
+ private BinaryRowMessage createKeyValueRow(long id, long value) {
RowAssembler rowBuilder = new RowAssembler(SCHEMA);
rowBuilder.appendLong(id);
rowBuilder.appendLong(value);
- return rowBuilder.build().byteBuffer();
+ BinaryRow row = rowBuilder.build();
+
+ return tableMessagesFactory.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
}
}
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index b412c75a3f..0f5fa3bf5d 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -35,7 +35,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.nio.ByteBuffer;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
@@ -64,7 +63,6 @@ import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRowEx;
-import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeType;
import org.apache.ignite.internal.schema.NativeTypeSpec;
@@ -78,6 +76,7 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
@@ -197,7 +196,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
RaftGroupService r = groupRafts.get(request.groupId());
if (request instanceof ReadWriteMultiRowReplicaRequest) {
- Map<UUID, ByteBuffer> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRowsBytes()
+ Map<UUID, BinaryRowMessage> rows = ((ReadWriteMultiRowReplicaRequest) request).binaryRowMessages()
.stream()
.collect(toMap(row -> TestTransactionIds.newTransactionId(), Function.identity()));
@@ -221,7 +220,7 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
.build()
)
.rowUuid(UUID.randomUUID())
- .rowBuffer(((ReadWriteSingleRowReplicaRequest) request).binaryRow().byteBuffer())
+ .rowMessage(((ReadWriteSingleRowReplicaRequest) request).binaryRowMessage())
.txId(TestTransactionIds.newTransactionId())
.build());
}
@@ -394,14 +393,14 @@ public class ItColocationTest extends BaseIgniteAbstractTest {
partsMap.merge(part, 1, (cnt, ignore) -> ++cnt);
}
- assertEquals(partsMap.size(), CMDS_MAP.size());
+ assertEquals(CMDS_MAP.size(), partsMap.size());
CMDS_MAP.forEach((p, set) -> {
UpdateAllCommand cmd = (UpdateAllCommand) CollectionUtils.first(set);
assertEquals(partsMap.get(p), cmd.rowsToUpdate().size(), () -> "part=" + p + ", set=" + set);
- cmd.rowsToUpdate().values().forEach(byteBuffer -> {
- Row r = new Row(schema, new ByteBufferRow(byteBuffer));
+ cmd.rowsToUpdate().values().forEach(rowMessage -> {
+ Row r = new Row(schema, rowMessage.asBinaryRow());
assertEquals(INT_TABLE.partition(r), p);
});
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 110be7852c..146ff6620a 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.table.distributed;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -32,13 +31,13 @@ import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
@@ -102,20 +101,19 @@ public class StorageUpdateHandler {
* @param txId Transaction id.
* @param rowUuid Row UUID.
* @param commitPartitionId Commit partition id.
- * @param rowBuffer Row buffer.
+ * @param row Row.
* @param onApplication Callback on application.
*/
public void handleUpdate(
UUID txId,
UUID rowUuid,
TablePartitionId commitPartitionId,
- @Nullable ByteBuffer rowBuffer,
+ @Nullable BinaryRow row,
@Nullable Consumer<RowId> onApplication
) {
indexUpdateHandler.waitIndexes();
storage.runConsistently(locker -> {
- BinaryRow row = rowBuffer != null ? new ByteBufferRow(rowBuffer) : null;
RowId rowId = new RowId(partitionId, rowUuid);
int commitTblId = commitPartitionId.tableId();
int commitPartId = commitPartitionId.partitionId();
@@ -151,7 +149,7 @@ public class StorageUpdateHandler {
*/
public void handleUpdateAll(
UUID txId,
- Map<UUID, ByteBuffer> rowsToUpdate,
+ Map<UUID, BinaryRowMessage> rowsToUpdate,
TablePartitionId commitPartitionId,
@Nullable Consumer<Collection<RowId>> onReplication
) {
@@ -165,11 +163,11 @@ public class StorageUpdateHandler {
List<RowId> rowIds = new ArrayList<>();
// Sort IDs to prevent deadlock. Natural UUID order matches RowId order within the same partition.
- SortedMap<UUID, ByteBuffer> sortedRowsToUpdateMap = new TreeMap<>(rowsToUpdate);
+ SortedMap<UUID, BinaryRowMessage> sortedRowsToUpdateMap = new TreeMap<>(rowsToUpdate);
- for (Map.Entry<UUID, ByteBuffer> entry : sortedRowsToUpdateMap.entrySet()) {
+ for (Map.Entry<UUID, BinaryRowMessage> entry : sortedRowsToUpdateMap.entrySet()) {
RowId rowId = new RowId(partitionId, entry.getKey());
- BinaryRow row = entry.getValue() != null ? new ByteBufferRow(entry.getValue()) : null;
+ BinaryRow row = entry.getValue() == null ? null : entry.getValue().asBinaryRow();
locker.lock(rowId);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
index 4ccb9ab6e9..667a58fde0 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableMessageGroup.java
@@ -34,6 +34,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
@@ -143,6 +144,11 @@ public interface TableMessageGroup {
*/
short BINARY_TUPLE = 17;
+ /**
+ * Message type for {@link BinaryRowMessage}.
+ */
+ short BINARY_ROW_MESSAGE = 18;
+
/**
* Message types for Table module RAFT commands.
*/
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
index 9654deff74..4755ac0502 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateAllCommand.java
@@ -17,10 +17,10 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.nio.ByteBuffer;
import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.network.annotations.Transferable;
/**
@@ -30,5 +30,5 @@ import org.apache.ignite.network.annotations.Transferable;
public interface UpdateAllCommand extends PartitionCommand {
TablePartitionIdMessage tablePartitionId();
- Map<UUID, ByteBuffer> rowsToUpdate();
+ Map<UUID, BinaryRowMessage> rowsToUpdate();
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
index 358432d1d7..0515214a55 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/UpdateCommand.java
@@ -17,9 +17,10 @@
package org.apache.ignite.internal.table.distributed.command;
-import java.nio.ByteBuffer;
import java.util.UUID;
+import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.network.annotations.Transferable;
import org.jetbrains.annotations.Nullable;
@@ -33,5 +34,13 @@ public interface UpdateCommand extends PartitionCommand {
UUID rowUuid();
@Nullable
- ByteBuffer rowBuffer();
+ BinaryRowMessage rowMessage();
+
+ /** Returns the row to update or {@code null} if the row should be removed. */
+ @Nullable
+ default BinaryRow row() {
+ BinaryRowMessage message = rowMessage();
+
+ return message == null ? null : message.asBinaryRow();
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index b3746397c0..41c1ee0d3c 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -231,7 +231,7 @@ public class PartitionListener implements RaftGroupListener {
return;
}
- storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.tablePartitionId().asTablePartitionId(), cmd.rowBuffer(),
+ storageUpdateHandler.handleUpdate(cmd.txId(), cmd.rowUuid(), cmd.tablePartitionId().asTablePartitionId(), cmd.row(),
rowId -> {
txsPendingRowIds.computeIfAbsent(cmd.txId(), entry -> new TreeSet<>()).add(rowId);
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
index 0dacd347ac..fd5a9a3c37 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopier.java
@@ -21,7 +21,6 @@ import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestamp;
-import java.nio.ByteBuffer;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -31,7 +30,6 @@ import java.util.function.Function;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
@@ -46,6 +44,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.network.ClusterNode;
@@ -393,9 +392,9 @@ public class IncomingSnapshotCopier extends SnapshotCopier {
private void writeVersion(ResponseEntry entry, int i) {
RowId rowId = new RowId(partId(), entry.rowId());
- ByteBuffer rowVersion = entry.rowVersions().get(i);
+ BinaryRowMessage rowVersion = entry.rowVersions().get(i);
- BinaryRow binaryRow = rowVersion == null ? null : new ByteBufferRow(rowVersion.rewind());
+ BinaryRow binaryRow = rowVersion == null ? null : rowVersion.asBinaryRow();
PartitionAccess partition = partitionSnapshotStorage.partition();
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java
index 523b0f4834..ca115fdb24 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/message/SnapshotMvDataResponse.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.message;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.table.TableRow;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.network.NetworkMessage;
import org.apache.ignite.network.annotations.Transferable;
import org.jetbrains.annotations.Nullable;
@@ -48,7 +48,7 @@ public interface SnapshotMvDataResponse extends NetworkMessage {
UUID rowId();
/** List of {@link TableRow}s for a given {@link #rowId()}. */
- List<ByteBuffer> rowVersions();
+ List<BinaryRowMessage> rowVersions();
/**
* List of commit timestamps for all committed versions. Might be smaller than {@link #rowVersions()} if there's a write-intent
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
index 5e947e32dc..82544f9f06 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java
@@ -17,7 +17,6 @@
package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
-import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.List;
@@ -42,6 +41,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataResponse;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -264,12 +264,13 @@ public class OutgoingSnapshot {
return totalBytesAfter;
}
- private static long rowSizeInBytes(List<ByteBuffer> rowVersions) {
+ private static long rowSizeInBytes(List<BinaryRowMessage> rowVersions) {
long sum = 0;
- for (ByteBuffer buf : rowVersions) {
- if (buf != null) {
- sum += buf.remaining();
+ for (BinaryRowMessage rowMessage : rowVersions) {
+ if (rowMessage != null) {
+ // Schema version is an unsigned short.
+ sum += rowMessage.binaryTuple().remaining() + Short.BYTES;
}
}
@@ -318,7 +319,7 @@ public class OutgoingSnapshot {
}
int count = rowVersionsN2O.size();
- List<ByteBuffer> buffers = new ArrayList<>(count);
+ List<BinaryRowMessage> rowVersions = new ArrayList<>(count);
int commitTimestampsCount = rowVersionsN2O.get(0).isWriteIntent() ? count - 1 : count;
long[] commitTimestamps = new long[commitTimestampsCount];
@@ -331,7 +332,14 @@ public class OutgoingSnapshot {
ReadResult version = rowVersionsN2O.get(i);
BinaryRow row = version.binaryRow();
- buffers.add(row == null ? null : row.byteBuffer());
+ BinaryRowMessage rowMessage = row == null
+ ? null
+ : MESSAGES_FACTORY.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
+
+ rowVersions.add(rowMessage);
if (version.isWriteIntent()) {
assert i == 0 : rowVersionsN2O;
@@ -346,7 +354,7 @@ public class OutgoingSnapshot {
return MESSAGES_FACTORY.responseEntry()
.rowId(rowId.uuid())
- .rowVersions(buffers)
+ .rowVersions(rowVersions)
.timestamps(commitTimestamps)
.txId(transactionId)
.commitTableId(commitTableId)
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java
similarity index 63%
copy from modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
copy to modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java
index 98a4656483..3bb8cd8057 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/BinaryRowMessage.java
@@ -18,22 +18,22 @@
package org.apache.ignite.internal.table.distributed.replication.request;
import java.nio.ByteBuffer;
-import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
-import org.apache.ignite.network.annotations.Marshallable;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
+import org.apache.ignite.internal.table.distributed.TableMessageGroup;
+import org.apache.ignite.network.NetworkMessage;
+import org.apache.ignite.network.annotations.Transferable;
/**
- * Single-row replica request.
+ * Message for transferring a {@link BinaryRow}.
*/
-public interface SingleRowReplicaRequest extends ReplicaRequest {
- ByteBuffer binaryRowBytes();
+@Transferable(TableMessageGroup.BINARY_ROW_MESSAGE)
+public interface BinaryRowMessage extends NetworkMessage {
+ ByteBuffer binaryTuple();
- default BinaryRow binaryRow() {
- return new ByteBufferRow(binaryRowBytes());
- }
+ int schemaVersion();
- @Marshallable
- RequestType requestType();
+ default BinaryRow asBinaryRow() {
+ return new BinaryRowImpl(schemaVersion(), binaryTuple());
+ }
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
index 5b7c39b720..415f73253f 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/MultipleRowReplicaRequest.java
@@ -17,12 +17,10 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
@@ -30,18 +28,18 @@ import org.apache.ignite.network.annotations.Marshallable;
* Multiple row replica request.
*/
public interface MultipleRowReplicaRequest extends ReplicaRequest {
- Collection<ByteBuffer> binaryRowsBytes();
+ Collection<BinaryRowMessage> binaryRowMessages();
/**
* Deserializes binary row byte buffers into binary rows.
*/
default Collection<BinaryRow> binaryRows() {
- Collection<ByteBuffer> binaryRowsBytes = binaryRowsBytes();
+ Collection<BinaryRowMessage> binaryRowMessages = binaryRowMessages();
- var result = new ArrayList<BinaryRow>(binaryRowsBytes.size());
+ var result = new ArrayList<BinaryRow>(binaryRowMessages.size());
- for (ByteBuffer buffer : binaryRowsBytes) {
- result.add(new ByteBufferRow(buffer));
+ for (BinaryRowMessage message : binaryRowMessages) {
+ result.add(message.asBinaryRow());
}
return result;
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
index 98a4656483..a2f033cf0e 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SingleRowReplicaRequest.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import java.nio.ByteBuffer;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
@@ -28,10 +26,10 @@ import org.apache.ignite.network.annotations.Marshallable;
* Single-row replica request.
*/
public interface SingleRowReplicaRequest extends ReplicaRequest {
- ByteBuffer binaryRowBytes();
+ BinaryRowMessage binaryRowMessage();
default BinaryRow binaryRow() {
- return new ByteBufferRow(binaryRowBytes());
+ return binaryRowMessage().asBinaryRow();
}
@Marshallable
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
index d088c702d3..1b5a82f6ec 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replication/request/SwapRowReplicaRequest.java
@@ -17,10 +17,8 @@
package org.apache.ignite.internal.table.distributed.replication.request;
-import java.nio.ByteBuffer;
import org.apache.ignite.internal.replicator.message.ReplicaRequest;
import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
import org.apache.ignite.network.annotations.Marshallable;
@@ -28,16 +26,16 @@ import org.apache.ignite.network.annotations.Marshallable;
* Dual row replica request.
*/
public interface SwapRowReplicaRequest extends ReplicaRequest {
- ByteBuffer binaryRowBytes();
+ BinaryRowMessage binaryRowMessage();
default BinaryRow binaryRow() {
- return new ByteBufferRow(binaryRowBytes());
+ return binaryRowMessage().asBinaryRow();
}
- ByteBuffer oldBinaryRowBytes();
+ BinaryRowMessage oldBinaryRowMessage();
default BinaryRow oldBinaryRow() {
- return new ByteBufferRow(oldBinaryRowBytes());
+ return oldBinaryRowMessage().asBinaryRow();
}
@Marshallable
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 5f44953212..1979ef7730 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -112,6 +112,7 @@ import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommandBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyReplicaRequest;
@@ -141,6 +142,7 @@ import org.apache.ignite.internal.util.ArrayUtils;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.CursorUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.Lazy;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ErrorGroups.Replicator;
@@ -924,13 +926,13 @@ public class PartitionReplicaListener implements ReplicaListener {
return committedReadResult.binaryRow();
})
- .thenComposeAsync(resolvedReadResult -> {
- if (resolvedReadResult != null && indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) {
- result.add(resolvedReadResult);
- }
+ .thenComposeAsync(resolvedReadResult -> {
+ if (resolvedReadResult != null && indexRowMatches(indexRow, resolvedReadResult, schemaAwareIndexStorage)) {
+ result.add(resolvedReadResult);
+ }
- return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result);
- }, scanRequestExecutor);
+ return continueReadOnlyIndexScan(schemaAwareIndexStorage, cursor, timestamp, batchSize, result);
+ }, scanRequestExecutor);
}
/**
@@ -1404,9 +1406,9 @@ public class PartitionReplicaListener implements ReplicaListener {
checkWriteIntentsBelongSameTx(writeIntents);
return resolveTxState(
- new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()),
- writeIntent.transactionId(),
- ts)
+ new TablePartitionId(writeIntent.commitTableId(), writeIntent.commitPartitionId()),
+ writeIntent.transactionId(),
+ ts)
.thenApply(readLastCommitted -> {
if (readLastCommitted) {
for (ReadResult wi : writeIntents) {
@@ -1468,7 +1470,7 @@ public class PartitionReplicaListener implements ReplicaListener {
/**
* Tests row values for equality.
*
- * @param row Row.
+ * @param row Row.
* @param row2 Row.
* @return {@code true} if rows are equal.
*/
@@ -1533,7 +1535,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return allOf(rowIdLockFuts).thenCompose(ignore -> {
- Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>();
+ Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>();
Collection<BinaryRow> result = new ArrayList<>();
int futNum = 0;
@@ -1572,7 +1574,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return allOf(deleteExactLockFuts).thenCompose(ignore -> {
- Map<UUID, ByteBuffer> rowIdsToDelete = new HashMap<>();
+ Map<UUID, BinaryRowMessage> rowIdsToDelete = new HashMap<>();
Collection<BinaryRow> result = new ArrayList<>();
int futNum = 0;
@@ -1642,10 +1644,14 @@ public class PartitionReplicaListener implements ReplicaListener {
insertLockFuts[idx++] = takeLocksForInsert(entry.getValue(), entry.getKey(), txId);
}
- Map<UUID, ByteBuffer> convertedMap = rowsToInsert.entrySet().stream().collect(
- Collectors.toMap(
+ Map<UUID, BinaryRowMessage> convertedMap = rowsToInsert.entrySet().stream()
+ .collect(Collectors.toMap(
e -> e.getKey().uuid(),
- e -> e.getValue().byteBuffer()));
+ e -> MSG_FACTORY.binaryRowMessage()
+ .binaryTuple(e.getValue().tupleSlice())
+ .schemaVersion(e.getValue().schemaVersion())
+ .build()
+ ));
return allOf(insertLockFuts)
.thenCompose(ignored -> applyUpdateAllCommand(
@@ -1679,14 +1685,14 @@ public class PartitionReplicaListener implements ReplicaListener {
}
return allOf(rowIdFuts).thenCompose(ignore -> {
- Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>();
+ Map<UUID, BinaryRowMessage> rowsToUpdate = IgniteUtils.newHashMap(request.binaryRowMessages().size());
int futNum = 0;
- for (BinaryRow row : request.binaryRows()) {
+ for (BinaryRowMessage row : request.binaryRowMessages()) {
RowId lockedRow = rowIdFuts[futNum++].join().get1();
- rowsToUpdate.put(lockedRow.uuid(), row.byteBuffer());
+ rowsToUpdate.put(lockedRow.uuid(), row);
}
if (rowsToUpdate.isEmpty()) {
@@ -1747,7 +1753,7 @@ public class PartitionReplicaListener implements ReplicaListener {
cmd.txId(),
cmd.rowUuid(),
cmd.tablePartitionId().asTablePartitionId(),
- cmd.rowBuffer(),
+ cmd.row(),
rowId -> txsPendingRowIds.compute(cmd.txId(), (k, v) -> {
if (v == null) {
v = new TreeSet<>();
@@ -1963,7 +1969,7 @@ public class PartitionReplicaListener implements ReplicaListener {
/**
* Takes all required locks on a key, before upserting.
*
- * @param txId Transaction id.
+ * @param txId Transaction id.
* @return Future completes with tuple {@link RowId} and collection of {@link Lock}.
*/
private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForUpdate(BinaryRow binaryRow, RowId rowId, UUID txId) {
@@ -2035,7 +2041,7 @@ public class PartitionReplicaListener implements ReplicaListener {
/**
* Takes all required locks on a key, before deleting the value.
*
- * @param txId Transaction id.
+ * @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no value for remove.
*/
private CompletableFuture<RowId> takeLocksForDeleteExact(BinaryRow expectedRow, RowId rowId, BinaryRow actualRow, UUID txId) {
@@ -2055,7 +2061,7 @@ public class PartitionReplicaListener implements ReplicaListener {
/**
* Takes all required locks on a key, before deleting the value.
*
- * @param txId Transaction id.
+ * @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no value for the key.
*/
private CompletableFuture<RowId> takeLocksForDelete(BinaryRow binaryRow, RowId rowId, UUID txId) {
@@ -2068,7 +2074,7 @@ public class PartitionReplicaListener implements ReplicaListener {
/**
* Takes all required locks on a key, before getting the value.
*
- * @param txId Transaction id.
+ * @param txId Transaction id.
* @return Future completes with {@link RowId} or {@code null} if there is no value for the key.
*/
private CompletableFuture<RowId> takeLocksForGet(RowId rowId, UUID txId) {
@@ -2124,7 +2130,7 @@ public class PartitionReplicaListener implements ReplicaListener {
/**
* Takes all required locks on a key, before updating the value.
*
- * @param txId Transaction id.
+ * @param txId Transaction id.
* @return Future completes with tuple {@link RowId} and collection of {@link Lock} or {@code null} if there is no suitable row.
*/
private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForReplace(BinaryRow expectedRow, BinaryRow oldRow,
@@ -2284,8 +2290,7 @@ public class PartitionReplicaListener implements ReplicaListener {
}
/**
- * Resolves a read result to the matched row.
- * If the result does not match any row, the method returns a future to {@code null}.
+ * Resolves a read result to the matched row. If the result does not match any row, the method returns a future to {@code null}.
*
* @param readResult Read result.
* @param timestamp Timestamp.
@@ -2298,9 +2303,9 @@ public class PartitionReplicaListener implements ReplicaListener {
Supplier<BinaryRow> lastCommitted
) {
return resolveTxState(
- new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()),
- readResult.transactionId(),
- timestamp)
+ new TablePartitionId(readResult.commitTableId(), readResult.commitPartitionId()),
+ readResult.transactionId(),
+ timestamp)
.thenApply(readLastCommitted -> {
if (readLastCommitted) {
return lastCommitted.get();
@@ -2371,7 +2376,12 @@ public class PartitionReplicaListener implements ReplicaListener {
.safeTimeLong(hybridClock.nowLong());
if (row != null) {
- bldr.rowBuffer(row.byteBuffer());
+ BinaryRowMessage rowMessage = MSG_FACTORY.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
+
+ bldr.rowMessage(rowMessage);
}
return bldr.build();
@@ -2385,7 +2395,7 @@ public class PartitionReplicaListener implements ReplicaListener {
* @param txId Transaction ID.
* @return Constructed {@link UpdateAllCommand} object.
*/
- private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, ByteBuffer> rowsToUpdate, UUID txId) {
+ private UpdateAllCommand updateAllCommand(TablePartitionId tablePartId, Map<UUID, BinaryRowMessage> rowsToUpdate, UUID txId) {
return MSG_FACTORY.updateAllCommand()
.tablePartitionId(tablePartitionId(tablePartId))
.rowsToUpdate(rowsToUpdate)
@@ -2418,8 +2428,8 @@ public class PartitionReplicaListener implements ReplicaListener {
}
/**
- * Class that stores a list of futures for operations that has happened in a specific transaction.
- * Also, the class has a property {@code state} that represents a transaction state.
+ * Class that stores a list of futures for operations that has happened in a specific transaction. Also, the class has a property
+ * {@code state} that represents a transaction state.
*/
private static class TxCleanupReadyFutureList {
/**
@@ -2428,8 +2438,8 @@ public class PartitionReplicaListener implements ReplicaListener {
final Map<RequestType, List<CompletableFuture<?>>> futures = new EnumMap<>(RequestType.class);
/**
- * Transaction state. {@code TxState#ABORTED} and {@code TxState#COMMITED} match the final transaction states.
- * If the property is {@code null} the transaction is in pending state.
+ * Transaction state. {@code TxState#ABORTED} and {@code TxState#COMMITED} match the final transaction states. If the property is
+ * {@code null} the transaction is in pending state.
*/
TxState state;
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 94aa2eadd5..ce3b671430 100644
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -32,7 +32,6 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.net.ConnectException;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
@@ -74,7 +73,9 @@ import org.apache.ignite.internal.schema.BinaryTuplePrefix;
import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
+import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadOnlyScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteScanRetrieveBatchReplicaRequestBuilder;
@@ -237,9 +238,9 @@ public class InternalTableImpl implements InternalTable {
);
}
- final boolean implicit = tx == null;
+ boolean implicit = tx == null;
- final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
+ InternalTransaction tx0 = implicit ? txManager.begin() : tx;
int partId = partitionId(row);
@@ -300,7 +301,7 @@ public class InternalTableImpl implements InternalTable {
);
}
- final boolean implicit = tx == null;
+ boolean implicit = tx == null;
// It's possible to have null txState if transaction isn't started yet.
if (!implicit && !(tx.state() == TxState.PENDING || tx.state() == null)) {
@@ -308,7 +309,7 @@ public class InternalTableImpl implements InternalTable {
"The operation is attempted for completed transaction"));
}
- final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
+ InternalTransaction tx0 = implicit ? txManager.begin() : tx;
Int2ObjectMap<RowBatch> rowBatchByPartitionId = toRowBatchByPartitionId(keyRows);
@@ -532,7 +533,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
- .binaryRowBytes(keyRow.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(keyRow))
.commitPartitionId(commitPart)
.transactionId(txo.id())
.term(term)
@@ -554,7 +555,7 @@ public class InternalTableImpl implements InternalTable {
return replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlySingleRowReplicaRequest()
.groupId(partGroupId)
- .binaryRowBytes(keyRow.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(keyRow))
.requestType(RequestType.RO_GET)
.readTimestampLong(readTimestamp.longValue())
.build()
@@ -579,7 +580,7 @@ public class InternalTableImpl implements InternalTable {
tx,
(commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
- .binaryRowsBytes(serializeBinaryRows(keyRows0))
+ .binaryRowMessages(serializeBinaryRows(keyRows0))
.commitPartitionId(commitPart)
.transactionId(txo.id())
.term(term)
@@ -603,30 +604,36 @@ public class InternalTableImpl implements InternalTable {
for (Int2ObjectMap.Entry<RowBatch> partitionRowBatch : rowBatchByPartitionId.int2ObjectEntrySet()) {
ReplicationGroupId partGroupId = raftGroupServiceByPartitionId.get(partitionRowBatch.getIntKey()).groupId();
- CompletableFuture<Object> fut = replicaSvc.invoke(recipientNode, tableMessagesFactory.readOnlyMultiRowReplicaRequest()
+ ReadOnlyMultiRowReplicaRequest request = tableMessagesFactory.readOnlyMultiRowReplicaRequest()
.groupId(partGroupId)
- .binaryRowsBytes(serializeBinaryRows(partitionRowBatch.getValue().requestedRows))
+ .binaryRowMessages(serializeBinaryRows(partitionRowBatch.getValue().requestedRows))
.requestType(RequestType.RO_GET_ALL)
.readTimestampLong(readTimestamp.longValue())
- .build()
- );
+ .build();
- partitionRowBatch.getValue().resultFuture = fut;
+ partitionRowBatch.getValue().resultFuture = replicaSvc.invoke(recipientNode, request);
}
return collectMultiRowsResponsesWithRestoreOrder(rowBatchByPartitionId.values());
}
- private static List<ByteBuffer> serializeBinaryRows(Collection<? extends BinaryRow> rows) {
- var result = new ArrayList<ByteBuffer>(rows.size());
+ private List<BinaryRowMessage> serializeBinaryRows(Collection<? extends BinaryRow> rows) {
+ var result = new ArrayList<BinaryRowMessage>(rows.size());
for (BinaryRow row : rows) {
- result.add(row.byteBuffer());
+ result.add(serializeBinaryRow(row));
}
return result;
}
+ private BinaryRowMessage serializeBinaryRow(BinaryRow row) {
+ return tableMessagesFactory.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> upsert(BinaryRowEx row, InternalTransaction tx) {
@@ -636,7 +643,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_UPSERT)
@@ -680,7 +687,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_UPSERT)
@@ -698,7 +705,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_INSERT)
@@ -716,7 +723,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowsBytes(serializeBinaryRows(keyRows0))
+ .binaryRowMessages(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_INSERT_ALL)
@@ -735,7 +742,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_REPLACE_IF_EXIST)
@@ -753,8 +760,8 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSwapRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .oldBinaryRowBytes(oldRow.byteBuffer())
- .binaryRowBytes(newRow.byteBuffer())
+ .oldBinaryRowMessage(serializeBinaryRow(oldRow))
+ .binaryRowMessage(serializeBinaryRow(newRow))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_REPLACE)
@@ -772,7 +779,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_REPLACE)
@@ -790,7 +797,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowBytes(keyRow.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(keyRow))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE)
@@ -808,7 +815,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowBytes(oldRow.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(oldRow))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_EXACT)
@@ -826,7 +833,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, txo, groupId, term) -> tableMessagesFactory.readWriteSingleRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(serializeBinaryRow(row))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_GET_AND_DELETE)
@@ -844,7 +851,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowsBytes(serializeBinaryRows(keyRows0))
+ .binaryRowMessages(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_ALL)
@@ -866,7 +873,7 @@ public class InternalTableImpl implements InternalTable {
(commitPart, keyRows0, txo, groupId, term) -> tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowsBytes(serializeBinaryRows(keyRows0))
+ .binaryRowMessages(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_DELETE_EXACT_ALL)
@@ -982,18 +989,16 @@ public class InternalTableImpl implements InternalTable {
// won't be rolled back automatically - it's up to the user or outer engine.
if (tx != null && tx.isReadOnly()) {
throw new TransactionException(
- new TransactionException(
- TX_FAILED_READ_WRITE_OPERATION_ERR,
- "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + '}'
- )
+ TX_FAILED_READ_WRITE_OPERATION_ERR,
+ "Failed to enlist read-write operation into read-only transaction txId={" + tx.id() + '}'
);
}
validatePartitionIndex(partId);
- final boolean implicit = tx == null;
+ boolean implicit = tx == null;
- final InternalTransaction tx0 = implicit ? txManager.begin() : tx;
+ InternalTransaction tx0 = implicit ? txManager.begin() : tx;
return new PartitionScanPublisher(
(scanId, batchSize) -> enlistCursorInTx(
@@ -1346,7 +1351,7 @@ public class InternalTableImpl implements InternalTable {
Function<CompletableFuture<Void>, CompletableFuture<Void>> onClose;
/** True when the publisher has a subscriber, false otherwise. */
- private AtomicBoolean subscribed;
+ private final AtomicBoolean subscribed;
/**
* The constructor.
@@ -1623,7 +1628,7 @@ public class InternalTableImpl implements InternalTable {
return tableMessagesFactory.readWriteMultiRowReplicaRequest()
.groupId(groupId)
.commitPartitionId(commitPart)
- .binaryRowsBytes(serializeBinaryRows(keyRows0))
+ .binaryRowMessages(serializeBinaryRows(keyRows0))
.transactionId(txo.id())
.term(term)
.requestType(RequestType.RW_UPSERT_ALL)
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index e5eb1e768b..7307f934e4 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.storage.index.impl.TestHashIndexStorage;
import org.apache.ignite.internal.storage.index.impl.TestSortedIndexStorage;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
@@ -55,13 +56,15 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
/**
- * Base test for indexes. Sets up a table with (int, string) key and (int, string) value and
- * three indexes: primary key, hash index over value columns and sorted index over value columns.
+ * Base test for indexes. Sets up a table with (int, string) key and (int, string) value and three indexes: primary key, hash index over
+ * value columns and sorted index over value columns.
*/
@ExtendWith(ConfigurationExtension.class)
public abstract class IndexBaseTest extends BaseMvStoragesTest {
protected static final int PARTITION_ID = 0;
+ private static final TableMessagesFactory MSG_FACTORY = new TableMessagesFactory();
+
private static final BinaryTupleSchema TUPLE_SCHEMA = BinaryTupleSchema.createRowSchema(schemaDescriptor);
private static final BinaryTupleSchema PK_INDEX_SCHEMA = BinaryTupleSchema.createKeySchema(schemaDescriptor);
@@ -160,13 +163,7 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest {
static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable BinaryRow row) {
TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
- handler.handleUpdate(
- TX_ID,
- rowUuid,
- partitionId,
- row == null ? null : row.byteBuffer(),
- (unused) -> {}
- );
+ handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {});
}
static BinaryRow defaultRow() {
@@ -226,22 +223,23 @@ public abstract class IndexBaseTest extends BaseMvStoragesTest {
USE_UPDATE {
@Override
void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) {
- handler.handleUpdate(
- TX_ID,
- rowUuid,
- partitionId,
- row == null ? null : row.byteBuffer(),
- (unused) -> {}
- );
+ handler.handleUpdate(TX_ID, rowUuid, partitionId, row, (unused) -> {});
}
},
/** Uses updateAll api. */
USE_UPDATE_ALL {
@Override
void addWrite(StorageUpdateHandler handler, TablePartitionId partitionId, UUID rowUuid, @Nullable BinaryRow row) {
+ BinaryRowMessage rowMessage = row == null
+ ? null
+ : MSG_FACTORY.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
+
handler.handleUpdateAll(
TX_ID,
- singletonMap(rowUuid, row == null ? null : row.byteBuffer()),
+ singletonMap(rowUuid, rowMessage),
partitionId,
(unused) -> {}
);
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
index 60d728e269..c4a661e310 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/command/PartitionRaftCommandsSerializationTest.java
@@ -22,7 +22,6 @@ import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Locale;
@@ -32,14 +31,15 @@ import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.raft.Command;
-import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.schema.marshaller.KvMarshaller;
import org.apache.ignite.internal.schema.marshaller.reflection.ReflectionMarshallerFactory;
+import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.table.distributed.TableMessageGroup;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tostring.IgniteToStringInclude;
import org.apache.ignite.internal.tostring.S;
@@ -52,13 +52,10 @@ import org.junit.jupiter.api.Test;
*/
public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
/** Key-value marshaller for tests. */
- protected static KvMarshaller<TestKey, TestValue> kvMarshaller;
+ private static KvMarshaller<TestKey, TestValue> kvMarshaller;
/** Message factory to create messages - RAFT commands. */
- private TableMessagesFactory msgFactory = new TableMessagesFactory();
-
- /** Factory for replica messages. */
- private ReplicaMessagesFactory replicaMessagesFactory = new ReplicaMessagesFactory();
+ private final TableMessagesFactory msgFactory = new TableMessagesFactory();
@BeforeAll
static void beforeAll() {
@@ -84,15 +81,13 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
.build()
)
.rowUuid(UUID.randomUUID())
- .rowBuffer(byteBufferFromBinaryRow(1))
+ .rowMessage(binaryRowMessage(1))
.txId(TestTransactionIds.newTransactionId())
.build();
UpdateCommand readCmd = copyCommand(cmd);
- assertEquals(cmd.txId(), readCmd.txId());
- assertEquals(cmd.rowUuid(), readCmd.rowUuid());
- assertEquals(cmd.rowBuffer(), readCmd.rowBuffer());
+ assertEquals(cmd, readCmd);
}
@Test
@@ -111,15 +106,15 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
assertEquals(cmd.txId(), readCmd.txId());
assertEquals(cmd.rowUuid(), readCmd.rowUuid());
- assertNull(readCmd.rowBuffer());
+ assertNull(readCmd.rowMessage());
}
@Test
public void testUpdateAllCommand() throws Exception {
- Map<UUID, ByteBuffer> rowsToUpdate = new HashMap<>();
+ Map<UUID, BinaryRowMessage> rowsToUpdate = new HashMap<>();
for (int i = 0; i < 10; i++) {
- rowsToUpdate.put(TestTransactionIds.newTransactionId(), byteBufferFromBinaryRow(i));
+ rowsToUpdate.put(TestTransactionIds.newTransactionId(), binaryRowMessage(i));
}
var cmd = msgFactory.updateAllCommand()
@@ -136,7 +131,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
assertEquals(cmd.txId(), readCmd.txId());
- for (Map.Entry<UUID, ByteBuffer> entry : cmd.rowsToUpdate().entrySet()) {
+ for (Map.Entry<UUID, BinaryRowMessage> entry : cmd.rowsToUpdate().entrySet()) {
assertTrue(readCmd.rowsToUpdate().containsKey(entry.getKey()));
var readVal = readCmd.rowsToUpdate().get(entry.getKey());
@@ -148,7 +143,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
@Test
public void testRemoveAllCommand() throws Exception {
- Map<UUID, ByteBuffer> rowsToRemove = new HashMap<>();
+ Map<UUID, BinaryRowMessage> rowsToRemove = new HashMap<>();
for (int i = 0; i < 10; i++) {
rowsToRemove.put(TestTransactionIds.newTransactionId(), null);
@@ -245,7 +240,7 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
.txId(updateCommand.txId())
.rowUuid(updateCommand.rowUuid())
.tablePartitionId(updateCommand.tablePartitionId())
- .rowBuffer(updateCommand.rowBuffer())
+ .rowMessage(updateCommand.rowMessage())
.build();
} else if (cmd instanceof UpdateAllCommand) {
UpdateAllCommand updateCommand = (UpdateAllCommand) cmd;
@@ -262,8 +257,16 @@ public class PartitionRaftCommandsSerializationTest extends IgniteAbstractTest {
}
}
- private static ByteBuffer byteBufferFromBinaryRow(int id) throws Exception {
- return kvMarshaller.marshal(new TestKey(id, String.valueOf(id)), new TestValue(id, String.valueOf(id))).byteBuffer();
+ private BinaryRowMessage binaryRowMessage(int id) throws Exception {
+ Row row = kvMarshaller.marshal(
+ new TestKey(id, String.valueOf(id)),
+ new TestValue(id, String.valueOf(id))
+ );
+
+ return msgFactory.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
}
/**
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
index f1d103dc99..d7c4f76f78 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/PartitionCommandListenerTest.java
@@ -38,7 +38,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.Serializable;
-import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
@@ -91,6 +90,7 @@ import org.apache.ignite.internal.table.distributed.command.TxCleanupCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.WorkDirectory;
import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
@@ -621,14 +621,12 @@ public class PartitionCommandListenerTest {
* Inserts all rows.
*/
private void insertAll() {
- Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT);
+ Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
UUID txId = TestTransactionIds.newTransactionId();
var commitPartId = new TablePartitionId(1, PARTITION_ID);
for (int i = 0; i < KEY_COUNT; i++) {
- Row row = getTestRow(i, i);
-
- rows.put(TestTransactionIds.newTransactionId(), row.byteBuffer());
+ rows.put(TestTransactionIds.newTransactionId(), getTestRow(i, i));
}
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -660,12 +658,12 @@ public class PartitionCommandListenerTest {
private void updateAll(Function<Integer, Integer> keyValueMapper) {
UUID txId = TestTransactionIds.newTransactionId();
var commitPartId = new TablePartitionId(1, PARTITION_ID);
- Map<UUID, ByteBuffer> rows = new HashMap<>(KEY_COUNT);
+ Map<UUID, BinaryRowMessage> rows = new HashMap<>(KEY_COUNT);
for (int i = 0; i < KEY_COUNT; i++) {
- Row row = getTestRow(i, keyValueMapper.apply(i));
+ BinaryRowMessage row = getTestRow(i, keyValueMapper.apply(i));
- rows.put(readRow(row).uuid(), row.byteBuffer());
+ rows.put(readRow(row.asBinaryRow()).uuid(), row);
}
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -695,12 +693,12 @@ public class PartitionCommandListenerTest {
private void deleteAll() {
UUID txId = TestTransactionIds.newTransactionId();
var commitPartId = new TablePartitionId(1, PARTITION_ID);
- Map<UUID, ByteBuffer> keyRows = new HashMap<>(KEY_COUNT);
+ Map<UUID, BinaryRowMessage> keyRows = new HashMap<>(KEY_COUNT);
for (int i = 0; i < KEY_COUNT; i++) {
- Row row = getTestRow(i, i);
+ BinaryRowMessage row = getTestRow(i, i);
- keyRows.put(readRow(row).uuid(), null);
+ keyRows.put(readRow(row.asBinaryRow()).uuid(), null);
}
HybridTimestamp commitTimestamp = hybridClock.now();
@@ -734,8 +732,8 @@ public class PartitionCommandListenerTest {
commandListener.onWrite(iterator((i, clo) -> {
UUID txId = TestTransactionIds.newTransactionId();
- Row row = getTestRow(i, keyValueMapper.apply(i));
- RowId rowId = readRow(row);
+ BinaryRowMessage row = getTestRow(i, keyValueMapper.apply(i));
+ RowId rowId = readRow(row.asBinaryRow());
assertNotNull(rowId);
@@ -749,7 +747,7 @@ public class PartitionCommandListenerTest {
.tableId(1)
.partitionId(PARTITION_ID).build())
.rowUuid(rowId.uuid())
- .rowBuffer(row.byteBuffer())
+ .rowMessage(row)
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
.build());
@@ -779,7 +777,7 @@ public class PartitionCommandListenerTest {
commandListener.onWrite(iterator((i, clo) -> {
UUID txId = TestTransactionIds.newTransactionId();
- Row row = getTestRow(i, i);
+ BinaryRow row = getTestRow(i, i).asBinaryRow();
RowId rowId = readRow(row);
assertNotNull(rowId);
@@ -857,7 +855,6 @@ public class PartitionCommandListenerTest {
commandListener.onWrite(iterator((i, clo) -> {
UUID txId = TestTransactionIds.newTransactionId();
- Row row = getTestRow(i, i);
txIds.add(txId);
when(clo.index()).thenReturn(raftIndex.incrementAndGet());
@@ -868,7 +865,7 @@ public class PartitionCommandListenerTest {
.tableId(1)
.partitionId(PARTITION_ID).build())
.rowUuid(UUID.randomUUID())
- .rowBuffer(row.byteBuffer())
+ .rowMessage(getTestRow(i, i))
.txId(txId)
.safeTimeLong(hybridClock.nowLong())
.build());
@@ -909,13 +906,18 @@ public class PartitionCommandListenerTest {
*
* @return Row.
*/
- private Row getTestRow(int key, int val) {
+ private BinaryRowMessage getTestRow(int key, int val) {
RowAssembler rowBuilder = new RowAssembler(SCHEMA);
rowBuilder.appendInt(key);
rowBuilder.appendInt(val);
- return new Row(SCHEMA, rowBuilder.build());
+ BinaryRow row = rowBuilder.build();
+
+ return msgFactory.binaryRowMessage()
+ .binaryTuple(row.tupleSlice())
+ .schemaVersion(row.schemaVersion())
+ .build();
}
private void invokeBatchedCommand(WriteCommand cmd) {
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
index 43a7b826ca..fbae8d3e71 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/incoming/IncomingSnapshotCopierTest.java
@@ -44,7 +44,6 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -85,6 +84,7 @@ import org.apache.ignite.internal.table.distributed.raft.snapshot.message.Snapsh
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotMvDataResponse.ResponseEntry;
import org.apache.ignite.internal.table.distributed.raft.snapshot.message.SnapshotTxDataRequest;
import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.tx.TxMeta;
import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -352,7 +352,7 @@ public class IncomingSnapshotCopierTest {
Collections.reverse(readResults);
- List<ByteBuffer> rowVersions = new ArrayList<>();
+ List<BinaryRowMessage> rowVersions = new ArrayList<>();
long[] timestamps = new long[readResults.size() + (readResults.get(0).isWriteIntent() ? -1 : 0)];
UUID txId = null;
@@ -361,7 +361,12 @@ public class IncomingSnapshotCopierTest {
int j = 0;
for (ReadResult readResult : readResults) {
- rowVersions.add(readResult.binaryRow().byteBuffer());
+ BinaryRowMessage rowMessage = TABLE_MSG_FACTORY.binaryRowMessage()
+ .binaryTuple(readResult.binaryRow().tupleSlice())
+ .schemaVersion(readResult.binaryRow().schemaVersion())
+ .build();
+
+ rowVersions.add(rowMessage);
if (readResult.isWriteIntent()) {
txId = readResult.transactionId();
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
index 05f5199784..56afb037c2 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotMvDataStreamingTest.java
@@ -30,12 +30,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
-import org.apache.ignite.internal.schema.ByteBufferRow;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
@@ -52,6 +54,9 @@ import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
class OutgoingSnapshotMvDataStreamingTest {
+ private static final BinaryRow ROW_1 = new BinaryRowImpl(0, ByteBuffer.wrap(new byte[]{1}));
+ private static final BinaryRow ROW_2 = new BinaryRowImpl(0, ByteBuffer.wrap(new byte[]{2}));
+
@Mock
private PartitionAccess partitionAccess;
@@ -93,10 +98,10 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsCommittedAndUncommittedVersionsFromStorage() {
- ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
ReadResult version2 = ReadResult.createFromWriteIntent(
rowId1,
- new ByteBufferRow(new byte[]{2}),
+ ROW_2,
transactionId,
commitTableId,
42,
@@ -118,8 +123,8 @@ class OutgoingSnapshotMvDataStreamingTest {
assertThat(responseRow.timestamps(), is(equalTo(new long[] {version1.commitTimestamp().longValue()})));
assertThat(responseRow.rowVersions(), hasSize(2));
- assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{1}));
- assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{2}));
+ assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_1));
+ assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_2));
}
private void configurePartitionAccessToHaveExactlyOneRowWith(List<ReadResult> versions) {
@@ -148,8 +153,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void reversesOrderOfVersionsObtainedFromPartition() {
- ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
configurePartitionAccessToHaveExactlyOneRowWith(List.of(version1, version2));
@@ -164,14 +169,14 @@ class OutgoingSnapshotMvDataStreamingTest {
);
assertThat(responseRow.rowVersions(), hasSize(2));
- assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{2}));
- assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{1}));
+ assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_2));
+ assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_1));
}
@Test
void iteratesRowsInPartition() {
- ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId2, ROW_2, clock.now());
when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1));
@@ -184,9 +189,9 @@ class OutgoingSnapshotMvDataStreamingTest {
assertThat(response.rows(), hasSize(2));
assertThat(response.rows().get(0).rowVersions(), hasSize(1));
- assertThat(response.rows().get(0).rowVersions().get(0).array(), is(new byte[]{1}));
+ assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(ROW_1));
assertThat(response.rows().get(1).rowVersions(), hasSize(1));
- assertThat(response.rows().get(1).rowVersions().get(0).array(), is(new byte[]{2}));
+ assertThat(response.rows().get(1).rowVersions().get(0).asBinaryRow(), is(ROW_2));
}
@Test
@@ -209,10 +214,10 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsCommittedAndUncommittedVersionsFromQueue() {
- ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now());
ReadResult version2 = ReadResult.createFromWriteIntent(
rowIdOutOfOrder,
- new ByteBufferRow(new byte[]{2}),
+ ROW_2,
transactionId,
commitTableId,
42,
@@ -244,8 +249,8 @@ class OutgoingSnapshotMvDataStreamingTest {
assertThat(responseRow.timestamps(), is(equalTo(new long[] {version1.commitTimestamp().longValue()})));
assertThat(responseRow.rowVersions(), hasSize(2));
- assertThat(responseRow.rowVersions().get(0).array(), is(new byte[]{1}));
- assertThat(responseRow.rowVersions().get(1).array(), is(new byte[]{2}));
+ assertThat(responseRow.rowVersions().get(0).asBinaryRow(), is(ROW_1));
+ assertThat(responseRow.rowVersions().get(1).asBinaryRow(), is(ROW_2));
}
private void configureClosestRowIdToBeEmpty() {
@@ -254,8 +259,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsOutOfOrderRowsWithHighestPriority() {
- ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version1));
@@ -309,7 +314,7 @@ class OutgoingSnapshotMvDataStreamingTest {
void doesNotSendWriteIntentTimestamp() {
ReadResult version = ReadResult.createFromWriteIntent(
rowId1,
- new ByteBufferRow(new byte[]{1}),
+ ROW_1,
transactionId,
commitTableId,
42,
@@ -334,8 +339,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void mvDataHandlingRespectsBatchSizeHintForMessagesFromPartition() {
- ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(rowId2, new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId2, ROW_2, clock.now());
when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1));
@@ -349,8 +354,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void mvDataHandlingRespectsBatchSizeHintForOutOfOrderMessages() {
- ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowIdOutOfOrder, ROW_1, clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version1));
@@ -373,8 +378,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void mvDataResponseThatIsNotLastHasFinishFalse() {
- ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2));
@@ -387,7 +392,9 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void sendsRowsFromPartitionBiggerThanHint() {
- ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[1000]), clock.now());
+ var row = new BinaryRowImpl(0, ByteBuffer.allocate(1000));
+
+ ReadResult version = ReadResult.createFromCommitted(rowId1, row, clock.now());
configurePartitionAccessToHaveExactlyOneRowWith(List.of(version));
@@ -395,12 +402,14 @@ class OutgoingSnapshotMvDataStreamingTest {
assertThat(response.rows(), hasSize(1));
assertThat(response.rows().get(0).rowVersions(), hasSize(1));
- assertThat(response.rows().get(0).rowVersions().get(0).limit(), is(1000));
+ assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(row));
}
@Test
void sendsRowsFromOutOfOrderQueueBiggerThanHint() {
- ReadResult version = ReadResult.createFromCommitted(rowIdOutOfOrder, new ByteBufferRow(new byte[1000]), clock.now());
+ var row = new BinaryRowImpl(0, ByteBuffer.allocate(1000));
+
+ ReadResult version = ReadResult.createFromCommitted(rowIdOutOfOrder, row, clock.now());
when(partitionAccess.getAllRowVersions(rowIdOutOfOrder)).thenReturn(List.of(version));
@@ -418,7 +427,7 @@ class OutgoingSnapshotMvDataStreamingTest {
assertThat(response.rows(), hasSize(1));
assertThat(response.rows().get(0).rowVersions(), hasSize(1));
- assertThat(response.rows().get(0).rowVersions().get(0).limit(), is(1000));
+ assertThat(response.rows().get(0).rowVersions().get(0).asBinaryRow(), is(row));
}
@Test
@@ -434,8 +443,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void lastSentRowIdIsPassed() {
- ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2));
@@ -454,8 +463,8 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void notYetSentRowIdIsNotPassed() {
- ReadResult version1 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
- ReadResult version2 = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{2}), clock.now());
+ ReadResult version1 = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
+ ReadResult version2 = ReadResult.createFromCommitted(rowId1, ROW_2, clock.now());
when(partitionAccess.closestRowId(lowestRowId)).thenReturn(rowId1);
when(partitionAccess.getAllRowVersions(rowId1)).thenReturn(List.of(version1, version2));
@@ -474,7 +483,7 @@ class OutgoingSnapshotMvDataStreamingTest {
@Test
void anyRowIdIsPassedForFinishedSnapshot() {
- ReadResult version = ReadResult.createFromCommitted(rowId1, new ByteBufferRow(new byte[]{1}), clock.now());
+ ReadResult version = ReadResult.createFromCommitted(rowId1, ROW_1, clock.now());
configurePartitionAccessToHaveExactlyOneRowWith(List.of(version));
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
index eb73abcd65..c1def6368f 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerIndexLockingTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage
import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replicator.PartitionReplicaListener;
import org.apache.ignite.internal.table.distributed.replicator.PlacementDriver;
import org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
@@ -253,7 +254,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
.term(1L)
.commitPartitionId(PARTITION_ID)
.transactionId(TRANSACTION_ID)
- .binaryRowBytes(testBinaryRow.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(testBinaryRow))
.requestType(arg.type)
.build());
@@ -299,7 +300,7 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
.term(1L)
.commitPartitionId(PARTITION_ID)
.transactionId(TRANSACTION_ID)
- .binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList()))
+ .binaryRowMessages(rows.stream().map(PartitionReplicaListenerIndexLockingTest::binaryRowMessage).collect(toList()))
.requestType(arg.type)
.build());
@@ -397,6 +398,13 @@ public class PartitionReplicaListenerIndexLockingTest extends IgniteAbstractTest
};
}
+ private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) {
+ return TABLE_MESSAGES_FACTORY.binaryRowMessage()
+ .binaryTuple(binaryRow.tupleSlice())
+ .schemaVersion(binaryRow.schemaVersion())
+ .build();
+ }
+
static class ReadWriteTestArg {
private final RequestType type;
private final LockMode expectedLockOnUniqueHash;
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index e9fbfe4e29..0b6fdd5e17 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.BinaryRowConverter;
+import org.apache.ignite.internal.schema.BinaryRowImpl;
import org.apache.ignite.internal.schema.BinaryTuple;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.NativeTypes;
@@ -115,6 +116,7 @@ import org.apache.ignite.internal.table.distributed.gc.GcUpdateHandler;
import org.apache.ignite.internal.table.distributed.index.IndexBuilder;
import org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.raft.PartitionDataStorage;
+import org.apache.ignite.internal.table.distributed.replication.request.BinaryRowMessage;
import org.apache.ignite.internal.table.distributed.replication.request.BinaryTupleMessage;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.IncompatibleSchemaAbortException;
@@ -513,7 +515,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
.readTimestampLong(clock.nowLong())
- .binaryRowBytes(testBinaryKey.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(testBinaryKey))
.requestType(RequestType.RO_GET)
.build());
@@ -536,7 +538,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
.readTimestampLong(clock.nowLong())
- .binaryRowBytes(testBinaryKey.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(testBinaryKey))
.requestType(RequestType.RO_GET)
.build());
@@ -559,7 +561,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
.readTimestampLong(clock.nowLong())
- .binaryRowBytes(testBinaryKey.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(testBinaryKey))
.requestType(RequestType.RO_GET)
.build());
@@ -581,7 +583,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
.readTimestampLong(clock.nowLong())
- .binaryRowBytes(testBinaryKey.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(testBinaryKey))
.requestType(RequestType.RO_GET)
.build());
@@ -604,7 +606,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
CompletableFuture<?> fut = partitionReplicaListener.invoke(TABLE_MESSAGES_FACTORY.readOnlySingleRowReplicaRequest()
.groupId(grpId)
.readTimestampLong(clock.nowLong())
- .binaryRowBytes(testBinaryKey.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(testBinaryKey))
.requestType(RequestType.RO_GET)
.build());
@@ -985,7 +987,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
- .binaryRowBytes(binaryRow.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(binaryRow))
.term(1L)
.commitPartitionId(commitPartitionId())
.build()
@@ -1001,7 +1003,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(requestType)
- .binaryRowsBytes(binaryRows.stream().map(BinaryRow::byteBuffer).collect(toList()))
+ .binaryRowMessages(binaryRows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList()))
.term(1L)
.commitPartitionId(commitPartitionId())
.build()
@@ -1022,7 +1024,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(RequestType.RW_INSERT)
- .binaryRowBytes(binaryRow.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(binaryRow))
.term(1L)
.commitPartitionId(commitPartitionId())
.build();
@@ -1049,7 +1051,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(txId)
.requestType(RequestType.RW_UPSERT_ALL)
- .binaryRowsBytes(asList(binaryRow0.byteBuffer(), binaryRow1.byteBuffer()))
+ .binaryRowMessages(asList(binaryRowMessage(binaryRow0), binaryRowMessage(binaryRow1)))
.term(1L)
.commitPartitionId(commitPartitionId())
.build();
@@ -1072,7 +1074,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
- if (rowEquals(binaryRow, row)) {
+ if (binaryRow.equals(row)) {
found = true;
}
}
@@ -1083,7 +1085,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
BinaryRow row = testMvPartitionStorage.read(rowId, HybridTimestamp.MAX_VALUE).binaryRow();
- assertTrue(row == null || !rowEquals(row, binaryRow));
+ assertTrue(row == null || !row.equals(binaryRow));
}
}
@@ -1226,11 +1228,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
? (upsertAfterDelete ? br1 : null)
: (insertFirst ? br1 : null);
- if (expected == null) {
- assertNull(res);
- } else {
- assertTrue(rowEquals(expected, res));
- }
+ assertEquals(expected, res);
}
cleanup(tx1);
@@ -1465,8 +1463,8 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.transactionId(targetTxId)
.requestType(RequestType.RW_REPLACE)
- .oldBinaryRowBytes(binaryRow(key, new TestValue(1, "v1"), kvMarshaller).byteBuffer())
- .binaryRowBytes(binaryRow(key, new TestValue(3, "v3"), kvMarshaller).byteBuffer())
+ .oldBinaryRowMessage(binaryRowMessage(binaryRow(key, new TestValue(1, "v1"), kvMarshaller)))
+ .binaryRowMessage(binaryRowMessage(binaryRow(key, new TestValue(3, "v3"), kvMarshaller)))
.term(1L)
.commitPartitionId(commitPartitionId())
.build()
@@ -1528,7 +1526,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RW_UPSERT)
.transactionId(txId)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(row))
.term(1L)
.commitPartitionId(new TablePartitionId(tblId, partId))
.build()
@@ -1540,7 +1538,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RW_DELETE)
.transactionId(txId)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(row))
.term(1L)
.commitPartitionId(new TablePartitionId(tblId, partId))
.build()
@@ -1552,7 +1550,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RO_GET)
.readTimestampLong(readTimestamp)
- .binaryRowBytes(row.byteBuffer())
+ .binaryRowMessage(binaryRowMessage(row))
.build()
);
@@ -1564,7 +1562,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
.groupId(grpId)
.requestType(RequestType.RO_GET_ALL)
.readTimestampLong(readTimestamp)
- .binaryRowsBytes(rows.stream().map(BinaryRow::byteBuffer).collect(toList()))
+ .binaryRowMessages(rows.stream().map(PartitionReplicaListenerTest::binaryRowMessage).collect(toList()))
.build()
);
@@ -1619,11 +1617,7 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
protected BinaryRow binaryRow(int i) {
- try {
- return kvMarshaller.marshal(new TestKey(i, "k" + i), new TestValue(i, "v" + i));
- } catch (MarshallerException e) {
- throw new AssertionError(e);
- }
+ return binaryRow(new TestKey(i, "k" + i), new TestValue(i, "v" + i));
}
private BinaryRow binaryRow(TestKey key, TestValue value) {
@@ -1632,7 +1626,9 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
private static BinaryRow binaryRow(TestKey key, TestValue value, KvMarshaller<TestKey, TestValue> marshaller) {
try {
- return marshaller.marshal(key, value);
+ Row row = marshaller.marshal(key, value);
+
+ return new BinaryRowImpl(row.schemaVersion(), row.tupleSlice());
} catch (MarshallerException e) {
throw new AssertionError(e);
}
@@ -1654,10 +1650,11 @@ public class PartitionReplicaListenerTest extends IgniteAbstractTest {
}
}
- private static boolean rowEquals(BinaryRow row1, BinaryRow row2) {
- return row1.schemaVersion() == row2.schemaVersion()
- && row1.hasValue() == row2.hasValue()
- && row1.tupleSlice().equals(row2.tupleSlice());
+ private static BinaryRowMessage binaryRowMessage(BinaryRow binaryRow) {
+ return TABLE_MESSAGES_FACTORY.binaryRowMessage()
+ .binaryTuple(binaryRow.tupleSlice())
+ .schemaVersion(binaryRow.schemaVersion())
+ .build();
}
/**