You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vp...@apache.org on 2022/12/19 15:28:14 UTC

[ignite-3] branch main updated: IGNITE-14793 Removed temporary messages serialization code (#1457)

This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new de69515565 IGNITE-14793 Removed temporary messages serialization code (#1457)
de69515565 is described below

commit de69515565be0d97af237dd84d70563a61f86d7a
Author: Vadim Pakhnushev <86...@users.noreply.github.com>
AuthorDate: Mon Dec 19 18:28:08 2022 +0300

    IGNITE-14793 Removed temporary messages serialization code (#1457)
---
 .../apache/ignite/internal/schema/BinaryRow.java   |  10 -
 .../ignite/internal/schema/ByteBufferRow.java      |  14 --
 .../org/apache/ignite/internal/schema/row/Row.java |  13 --
 .../ignite/internal/table/ItColocationTest.java    |   3 +-
 .../table/distributed/command/CommandUtils.java    | 258 ---------------------
 .../command/response/MultiRowsResponse.java        |  66 ------
 .../command/response/SingleRowResponse.java        |  62 -----
 .../table/type/NumericTypesSerializerTest.java     |  22 +-
 8 files changed, 7 insertions(+), 441 deletions(-)

diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
index af762964f9..5556948f42 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/BinaryRow.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.schema;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
 /**
@@ -82,14 +80,6 @@ public interface BinaryRow {
      */
     ByteBuffer valueSlice();
 
-    /**
-     * Writes binary row to given stream.
-     *
-     * @param stream Stream to write to.
-     * @throws IOException If write operation fails.
-     */
-    void writeTo(OutputStream stream) throws IOException;
-
     /**
      * Get byte array of the row.
      */
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
index 925e216240..a7bdc9e8d2 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/ByteBufferRow.java
@@ -17,12 +17,8 @@
 
 package org.apache.ignite.internal.schema;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.nio.channels.Channels;
-import java.nio.channels.WritableByteChannel;
 
 /**
  * Heap byte buffer-based row.
@@ -74,16 +70,6 @@ public class ByteBufferRow implements BinaryRow {
         return buf.getInt(KEY_HASH_FIELD_OFFSET);
     }
 
-    /** {@inheritDoc} */
-    @Override
-    public void writeTo(OutputStream stream) throws IOException {
-        WritableByteChannel channel = Channels.newChannel(stream);
-
-        channel.write(buf);
-
-        buf.rewind();
-    }
-
     /** {@inheritDoc} */
     @Override
     public ByteBuffer keySlice() {
diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
index 6f134bc834..67072320ef 100644
--- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
+++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/row/Row.java
@@ -17,8 +17,6 @@
 
 package org.apache.ignite.internal.schema.row;
 
-import java.io.IOException;
-import java.io.OutputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.nio.ByteBuffer;
@@ -770,17 +768,6 @@ public class Row implements BinaryRowEx, SchemaAware, InternalTuple {
         return row.valueSlice();
     }
 
-    /**
-     * Writes binary row to given stream.
-     *
-     * @param stream Stream to write to.
-     * @throws IOException If write operation fails.
-     */
-    @Override
-    public void writeTo(OutputStream stream) throws IOException {
-        row.writeTo(stream);
-    }
-
     /**
      * Read bytes by offset.
      *
diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 629018f065..ec3ac28631 100644
--- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -77,7 +77,6 @@ import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import org.apache.ignite.internal.table.distributed.TableMessagesFactory;
 import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
 import org.apache.ignite.internal.table.distributed.command.UpdateCommand;
-import org.apache.ignite.internal.table.distributed.command.response.MultiRowsResponse;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteMultiRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replication.request.ReadWriteSingleRowReplicaRequest;
 import org.apache.ignite.internal.table.distributed.replicator.TablePartitionId;
@@ -174,7 +173,7 @@ public class ItColocationTest {
                 });
 
                 if (cmd instanceof UpdateAllCommand) {
-                    return completedFuture(new MultiRowsResponse(List.of()).getValues());
+                    return completedFuture(List.of());
                 } else {
                     return completedFuture(true);
                 }
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
deleted file mode 100644
index 8f5a1a9bf3..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/CommandUtils.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.command;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Map;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.storage.RowId;
-import org.apache.ignite.lang.ErrorGroups.Common;
-import org.apache.ignite.lang.IgniteInternalException;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * This is an utility class for serialization cache tuples. It will be removed after another way for serialization is implemented into the
- * network layer.
- * TODO: Remove it after (IGNITE-14793)
- */
-public class CommandUtils {
-    /** The logger. */
-    private static final IgniteLogger LOG = Loggers.forClass(CommandUtils.class);
-
-    /**
-     * Writes a list of rows to byte array.
-     *
-     * @param rows     Collection of rows.
-     * @return         Rows data.
-     */
-    public static byte[] rowsToBytes(Collection<BinaryRow> rows) {
-        if (rows == null || rows.isEmpty()) {
-            return null;
-        }
-
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            for (BinaryRow row : rows) {
-                if (row == null) {
-                    baos.write(intToBytes(0));
-                } else {
-                    byte[] bytes = rowToBytes(row);
-
-                    baos.write(intToBytes(bytes.length));
-                    baos.write(bytes);
-                }
-            }
-
-            baos.flush();
-
-            return baos.toByteArray();
-        } catch (IOException e) {
-            LOG.debug("unable to write rows to stream [rowsCount={}]", e, rows.size());
-
-            throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
-        }
-    }
-
-    /**
-     * Writes a row to byte array.
-     *
-     * @param row      Row.
-     * @return         Row bytes.
-     */
-    public static byte[] rowToBytes(@Nullable BinaryRow row) {
-        if (row == null) {
-            return null;
-        }
-
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            row.writeTo(baos);
-
-            baos.flush();
-
-            return baos.toByteArray();
-        } catch (IOException e) {
-            LOG.debug("Unable to write row to stream [row={}]", e, row);
-
-            throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
-        }
-    }
-
-    /**
-     * Reads the keys from a byte array.
-     *
-     * @param bytes    Byte array.
-     * @param consumer Consumer for binary row.
-     */
-    public static void readRows(byte[] bytes, Consumer<BinaryRow> consumer) {
-        if (bytes == null || bytes.length == 0) {
-            return;
-        }
-
-        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
-            byte[] lenBytes = new byte[4];
-
-            byte[] rowBytes;
-
-            int read;
-
-            while ((read = bais.read(lenBytes)) != -1) {
-                assert read == 4;
-
-                int len = bytesToInt(lenBytes);
-
-                if (len == 0) {
-                    consumer.accept(null);
-
-                    continue;
-                }
-
-                rowBytes = new byte[len];
-
-                read = bais.read(rowBytes);
-
-                assert read == len;
-
-                consumer.accept(new ByteBufferRow(rowBytes));
-            }
-        } catch (IOException e) {
-            LOG.debug("Unable to read rows from stream", e);
-
-            throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
-        }
-    }
-
-    /**
-     * Writes a row map to byte array.
-     *
-     * @param map Map a row id to a binary row.
-     * @return Array of bytes.
-     */
-    public static byte[] rowMapToBytes(Map<RowId, BinaryRow> map) {
-        if (map == null) {
-            return null;
-        }
-
-        try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
-            try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
-                oos.writeInt(map.size());
-
-                for (Map.Entry<RowId, BinaryRow> e : map.entrySet()) {
-                    oos.writeShort((short) e.getKey().partitionId());
-                    oos.writeLong(e.getKey().mostSignificantBits());
-                    oos.writeLong(e.getKey().leastSignificantBits());
-
-                    if (e.getValue() == null) {
-                        oos.writeInt(0);
-                    } else {
-                        byte[] bytes = rowToBytes(e.getValue());
-
-                        oos.writeInt(bytes.length);
-                        oos.write(bytes);
-                    }
-                }
-
-            }
-
-            baos.flush();
-
-            return baos.toByteArray();
-        } catch (IOException e) {
-            LOG.debug("Unable to write the row map to stream [map={}]", e, map);
-
-            throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
-        }
-    }
-
-    /**
-     * Reads a row map from bytes.
-     *
-     * @param bytes Bytes to read.
-     * @param consumer Consumer for a key and a value for the map entry.
-     */
-    public static void readRowMap(byte[] bytes, BiConsumer<RowId, BinaryRow> consumer) {
-        if (bytes == null || bytes.length == 0) {
-            return;
-        }
-
-        try (ByteArrayInputStream bais = new ByteArrayInputStream(bytes)) {
-            try (ObjectInputStream ois = new ObjectInputStream(bais)) {
-                int size = ois.readInt();
-
-                for (int i = 0; i < size; i++) {
-                    int  partId = ois.readShort() & 0xFFFF;
-                    long mostSignificantBits = ois.readLong();
-                    long leastSignificantBits = ois.readLong();
-
-                    RowId id = new RowId(partId, mostSignificantBits, leastSignificantBits);
-
-                    int len = ois.readInt();
-
-                    BinaryRow row = null;
-
-                    if (len != 0) {
-                        byte[] rowBytes = new byte[len];
-
-                        int read = ois.read(rowBytes);
-
-                        assert read == len;
-
-                        row = new ByteBufferRow(rowBytes);
-                    }
-
-                    consumer.accept(id, row);
-                }
-            }
-        } catch (IOException e) {
-            LOG.debug("Unable to read row map from stream [bytes={}]", e, bytes);
-
-            throw new IgniteInternalException(Common.UNEXPECTED_ERR, e);
-        }
-    }
-
-    /**
-     * Serializes an integer to the byte array.
-     *
-     * @param i Integer value.
-     * @return Byte array.
-     */
-    private static byte[] intToBytes(int i) {
-        byte[] arr = new byte[4];
-        ByteBuffer.wrap(arr).putInt(i);
-        return arr;
-    }
-
-    /**
-     * Deserializes a byte array to the integer.
-     *
-     * @param bytes Byte array.
-     * @return Integer value.
-     */
-    private static int bytesToInt(byte[] bytes) {
-        return ByteBuffer.wrap(bytes).getInt();
-    }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
deleted file mode 100644
index b96758fd3a..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/MultiRowsResponse.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.command.response;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.table.distributed.command.CommandUtils;
-import org.apache.ignite.internal.table.distributed.command.UpdateAllCommand;
-
-/**
- * This class represents a response object that contains a collection {@link BinaryRow} from a batch operation.
- *
- * @see UpdateAllCommand
- */
-public class MultiRowsResponse implements Serializable {
-    /** Binary rows. */
-    private transient List<BinaryRow> rows;
-
-    /*
-     * Row bytes.
-     * It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after (IGNITE-14793).
-     */
-    private byte[] rowsBytes;
-
-    /**
-     * Creates a new instance of MultiRowsResponse with the given collection of binary rows.
-     *
-     * @param rows Collection of binary rows.
-     */
-    public MultiRowsResponse(List<BinaryRow> rows) {
-        this.rows = rows;
-
-        rowsBytes = CommandUtils.rowsToBytes(rows);
-    }
-
-    /**
-     * Returns binary rows.
-     */
-    public List<BinaryRow> getValues() {
-        if (rows == null && rowsBytes != null) {
-            rows = new ArrayList<>();
-
-            CommandUtils.readRows(rowsBytes, rows::add);
-        }
-
-        return rows;
-    }
-}
diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
deleted file mode 100644
index e44c7994e7..0000000000
--- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/command/response/SingleRowResponse.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.distributed.command.response;
-
-import java.io.Serializable;
-import org.apache.ignite.internal.schema.BinaryRow;
-import org.apache.ignite.internal.schema.ByteBufferRow;
-import org.apache.ignite.internal.table.distributed.command.CommandUtils;
-import org.jetbrains.annotations.Nullable;
-
-/**
- * This class represents a response object message that contains a single {@link BinaryRow}.
- */
-public class SingleRowResponse implements Serializable {
-    /** Binary row. */
-    @Nullable
-    private transient BinaryRow row;
-
-    /**
-     * Row bytes. It is a temporary solution, before network have not implement correct serialization BinaryRow.
-     * TODO: Remove the field after (IGNITE-14793).
-     */
-    private byte[] rowBytes;
-
-    /**
-     * Creates a new instance of SingleRowResponse with the given binary row.
-     *
-     * @param row Binary row.
-     */
-    public SingleRowResponse(@Nullable BinaryRow row) {
-        this.row = row;
-
-        rowBytes = CommandUtils.rowToBytes(row);
-    }
-
-    /**
-     * Returns binary row.
-     */
-    @Nullable
-    public BinaryRow getValue() {
-        if (row == null && rowBytes != null) {
-            row = new ByteBufferRow(rowBytes);
-        }
-
-        return row;
-    }
-}
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
index 815ecd14e5..a658999c56 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/type/NumericTypesSerializerTest.java
@@ -21,7 +21,6 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
-import java.io.ByteArrayOutputStream;
 import java.math.BigDecimal;
 import java.math.BigInteger;
 import java.math.RoundingMode;
@@ -268,23 +267,14 @@ public class NumericTypesSerializerTest {
 
         long randomKey = rnd.nextLong();
 
-        final Tuple firstTup = createTuple().set("key", randomKey).set("decimalCol", pair.getFirst());
-        final Tuple secondTup = createTuple().set("key", randomKey).set("decimalCol", pair.getSecond());
+        Tuple firstTup = createTuple().set("key", randomKey).set("decimalCol", pair.getFirst());
+        Tuple secondTup = createTuple().set("key", randomKey).set("decimalCol", pair.getSecond());
 
-        final Row firstRow = marshaller.marshal(firstTup);
-        final Row secondRow = marshaller.marshal(secondTup);
+        Row firstRow = marshaller.marshal(firstTup);
+        Row secondRow = marshaller.marshal(secondTup);
 
-        ByteArrayOutputStream stream = new ByteArrayOutputStream();
-
-        firstRow.writeTo(stream);
-
-        byte[] firstRowInBytes = stream.toByteArray();
-
-        stream.reset();
-
-        secondRow.writeTo(stream);
-
-        byte[] secondRowInBytes = stream.toByteArray();
+        byte[] firstRowInBytes = firstRow.bytes();
+        byte[] secondRowInBytes = secondRow.bytes();
 
         assertArrayEquals(firstRowInBytes, secondRowInBytes);
     }