You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/12/19 15:28:14 UTC
[ignite-3] branch main updated: IGNITE-14793 Removed temporary messages serialization code (#1457)
This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new de69515565 IGNITE-14793 Removed temporary messages serialization code (#1457)
de69515565 is described below
commit de69515565be0d97af237dd84d70563a61f86d7a
Author: Vadim Pakhnushev <86...@users.noreply.github.com>
AuthorDate: Mon Dec 19 18:28:08 2022 +0300
IGNITE-14793 Removed temporary messages serialization code (#1457)
---
.../apache/ignite/internal/schema/BinaryRow.java | 10 -
.../ignite/internal/schema/ByteBufferRow.java | 14 --
.../org/apache/ignite/internal/schema/row/Row.java | 13 --
.../ignite/internal/table/ItColocationTest.java | 3 +-
.../table/distributed/command/CommandUtils.java | 258 ---------------------
.../command/response/MultiRowsResponse.java | 66 ------
.../command/response/SingleRowResponse.java | 62 -----
.../table/type/NumericTypesSerializerTest.java | 22 +-
8 files changed, 7 insertions(+), 441 deletions(-)
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index af762964f9..5556948f42 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.schema;
-import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
/**
@@ -82,14 +80,6 @@ public interface BinaryRow {
*/
ByteBuffer valueSlice();
- /**
- * Writes binary row to given stream.
- *
- * @param stream Stream to write to.
- * @throws IOException If write operation fails.
- */
- void writeTo(OutputStream stream) throws IOException;
-
/**
* Get byte array of the row.
*/
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 925e216240..a7bdc9e8d2 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -17,12 +17,8 @@
package org.apache.ignite.internal.schema;
-import java.io.IOException;
-import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
/**
* Heap byte buffer-based row.
@@ -74,16 +70,6 @@ public class ByteBufferRow implements BinaryRow {
return buf.getInt(KEY_HASH_FIELD_OFFSET);
}
- /** {@inheritDoc} */
- @Override
- public void writeTo(OutputStream stream) throws IOException {
- WritableByteChannel channel = Channels.newChannel(stream);
-
- channel.write(buf);
-
- buf.rewind();
- }
-
/** {@inheritDoc} */
@Override
public ByteBuffer keySlice() {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 6f134bc834..67072320ef 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -17,8 +17,6 @@
package org.apache.ignite.internal.schema.row;
-import java.io.IOException;
-import java.io.OutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
@@ -770,17 +768,6 @@ public class Row implements BinaryRowEx, SchemaAware, InternalTuple {
return row.valueSlice();
}
- /**
- * Writes binary row to given stream.
- *
- * @param stream Stream to write to.
- * @throws IOException If write operation fails.
- */
- @Override
- public void writeTo(OutputStream stream) throws IOException {
- row.writeTo(stream);
- }
-
/**
* Read bytes by offset.
*
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 629018f065..ec3ac28631 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage;
import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
-import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
@@ -174,7 +173,7 @@ public class ItColocationTest {
});
if (cmd instanceof UpdateAllCommand) {
- return completedFuture(new MultiRowsResponse(List.of()).getValues());
+ return completedFuture(List.of());
} else {
return completedFuture(true);
}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
deleted file mode 100644
index 8f5a1a9bf3..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.command;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.lang.ErrorGroups.Common;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * This is an utility class for serialization cache tuples. It will be removed after another way for serialization is implemented into the
- * network layer.
- * TODO: Remove it after (IGNITE-14793)
- */
-public class CommandUtils {
- /** The logger. */
- private static final IgniteLogger LOG = Loggers.forClass(CommandUtils.class);
-
- /**
- * Writes a list of rows to byte array.
- *
- * @param rows Collection of rows.
- * @return Rows data.
- */
- public static byte[] rowsToBytes(Collection<BinaryRow> rows) {
- if (rows == null || rows.isEmpty()) {
- return null;
- }
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- for (BinaryRow row : rows) {
- if (row == null) {
- baos.write(intToBytes(0));
- } else {
- byte[] bytes = rowToBytes(row);
-
- baos.write(intToBytes(bytes.length));
- baos.write(bytes);
- }
- }
-
- baos.flush();
-
- return baos.toByteArray();
- } catch (IOException e) {
- LOG.debug("unable to write rows to stream [rowsCount={}]", e, rows.size());
-
- throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
- }
- }
-
- /**
- * Writes a row to byte array.
- *
- * @param row Row.
- * @return Row bytes.
- */
- public static byte[] rowToBytes(@Nullable BinaryRow row) {
- if (row == null) {
- return null;
- }
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- row.writeTo(baos);
-
- baos.flush();
-
- return baos.toByteArray();
- } catch (IOException e) {
- LOG.debug("Unable to write row to stream [row={}]", e, row);
-
- throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
- }
- }
-
- /**
- * Reads the keys from a byte array.
- *
- * @param bytes Byte array.
- * @param consumer Consumer for binary row.
- */
- public static void readRows(byte[] bytes, Consumer<BinaryRow> consumer) {
- if (bytes == null || bytes.length == 0) {
- return;
- }
-
- try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
- byte[] lenBytes = new byte[4];
-
- byte[] rowBytes;
-
- int read;
-
- while ((read = bais.read(lenBytes)) != -1) {
- assert read == 4;
-
- int len = bytesToInt(lenBytes);
-
- if (len == 0) {
- consumer.accept(null);
-
- continue;
- }
-
- rowBytes = new byte[len];
-
- read = bais.read(rowBytes);
-
- assert read == len;
-
- consumer.accept(new ByteBufferRow(rowBytes));
- }
- } catch (IOException e) {
- LOG.debug("Unable to read rows from stream", e);
-
- throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
- }
- }
-
- /**
- * Writes a row map to byte array.
- *
- * @param map Map a row id to a binary row.
- * @return Array of bytes.
- */
- public static byte[] rowMapToBytes(Map<RowId, BinaryRow> map) {
- if (map == null) {
- return null;
- }
-
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
- oos.writeInt(map.size());
-
- for (Map.Entry<RowId, BinaryRow> e : map.entrySet()) {
- oos.writeShort((short) e.getKey().partitionId());
- oos.writeLong(e.getKey().mostSignificantBits());
- oos.writeLong(e.getKey().leastSignificantBits());
-
- if (e.getValue() == null) {
- oos.writeInt(0);
- } else {
- byte[] bytes = rowToBytes(e.getValue());
-
- oos.writeInt(bytes.length);
- oos.write(bytes);
- }
- }
-
- }
-
- baos.flush();
-
- return baos.toByteArray();
- } catch (IOException e) {
- LOG.debug("Unable to write the row map to stream [map={}]", e, map);
-
- throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
- }
- }
-
- /**
- * Reads a row map from bytes.
- *
- * @param bytes Bytes to read.
- * @param consumer Consumer for a key and a value for the map entry.
- */
- public static void readRowMap(byte[] bytes, BiConsumer<RowId, BinaryRow> consumer) {
- if (bytes == null || bytes.length == 0) {
- return;
- }
-
- try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
- try (ObjectInputStream ois = new ObjectInputStream(bais)) {
- int size = ois.readInt();
-
- for (int i = 0; i < size; i++) {
- int partId = ois.readShort() & 0xFFFF;
- long mostSignificantBits = ois.readLong();
- long leastSignificantBits = ois.readLong();
-
- RowId id = new RowId(partId, mostSignificantBits, leastSignificantBits);
-
- int len = ois.readInt();
-
- BinaryRow row = null;
-
- if (len != 0) {
- byte[] rowBytes = new byte[len];
-
- int read = ois.read(rowBytes);
-
- assert read == len;
-
- row = new ByteBufferRow(rowBytes);
- }
-
- consumer.accept(id, row);
- }
- }
- } catch (IOException e) {
- LOG.debug("Unable to read row map from stream [bytes={}]", e, bytes);
-
- throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
- }
- }
-
- /**
- * Serializes an integer to the byte array.
- *
- * @param i Integer value.
- * @return Byte array.
- */
- private static byte[] intToBytes(int i) {
- byte[] arr = new byte[4];
- ByteBuffer.wrap(arr).putInt(i);
- return arr;
- }
-
- /**
- * Deserializes a byte array to the integer.
- *
- * @param bytes Byte array.
- * @return Integer value.
- */
- private static int bytesToInt(byte[] bytes) {
- return ByteBuffer.wrap(bytes).getInt();
- }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
deleted file mode 100644
index b96758fd3a..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.command.response;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.table.distributed.command.CommandUtils;
-import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
-
-/**
- * This class represents a response object that contains a collection {@link BinaryRow} from a batch operation.
- *
- * @see UpdateAllCommand
- */
-public class MultiRowsResponse implements Serializable {
- /** Binary rows. */
- private transient List<BinaryRow> rows;
-
- /*
- * Row bytes.
- * It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after (IGNITE-14793).
- */
- private byte[] rowsBytes;
-
- /**
- * Creates a new instance of MultiRowsResponse with the given collection of binary rows.
- *
- * @param rows Collection of binary rows.
- */
- public MultiRowsResponse(List<BinaryRow> rows) {
- this.rows = rows;
-
- rowsBytes = CommandUtils.rowsToBytes(rows);
- }
-
- /**
- * Returns binary rows.
- */
- public List<BinaryRow> getValues() {
- if (rows == null && rowsBytes != null) {
- rows = new ArrayList<>();
-
- CommandUtils.readRows(rowsBytes, rows::add);
- }
-
- return rows;
- }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
deleted file mode 100644
index e44c7994e7..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.command.response;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.table.distributed.command.CommandUtils;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * This class represents a response object message that contains a single {@link BinaryRow}.
- */
-public class SingleRowResponse implements Serializable {
- /** Binary row. */
- @Nullable
- private transient BinaryRow row;
-
- /**
- * Row bytes. It is a temporary solution, before network have not implement correct serialization BinaryRow.
- * TODO: Remove the field after (IGNITE-14793).
- */
- private byte[] rowBytes;
-
- /**
- * Creates a new instance of SingleRowResponse with the given binary row.
- *
- * @param row Binary row.
- */
- public SingleRowResponse(@Nullable BinaryRow row) {
- this.row = row;
-
- rowBytes = CommandUtils.rowToBytes(row);
- }
-
- /**
- * Returns binary row.
- */
- @Nullable
- public BinaryRow getValue() {
- if (row == null && rowBytes != null) {
- row = new ByteBufferRow(rowBytes);
- }
-
- return row;
- }
-}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
index 815ecd14e5..a658999c56 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
@@ -21,7 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
-import java.io.ByteArrayOutputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
@@ -268,23 +267,14 @@ public class NumericTypesSerializerTest {
long randomKey = rnd.nextLong();
- final Tuple firstTup = createTuple().set("key", randomKey).set("decimalCol", pair.getFirst());
- final Tuple secondTup = createTuple().set("key", randomKey).set("decimalCol", pair.getSecond());
+ Tuple firstTup = createTuple().set("key", randomKey).set("decimalCol", pair.getFirst());
+ Tuple secondTup = createTuple().set("key", randomKey).set("decimalCol", pair.getSecond());
- final Row firstRow = marshaller.marshal(firstTup);
- final Row secondRow = marshaller.marshal(secondTup);
+ Row firstRow = marshaller.marshal(firstTup);
+ Row secondRow = marshaller.marshal(secondTup);
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
-
- firstRow.writeTo(stream);
-
- byte[] firstRowInBytes = stream.toByteArray();
-
- stream.reset();
-
- secondRow.writeTo(stream);
-
- byte[] secondRowInBytes = stream.toByteArray();
+ byte[] firstRowInBytes = firstRow.bytes();
+ byte[] secondRowInBytes = secondRow.bytes();
assertArrayEquals(firstRowInBytes, secondRowInBytes);
}