You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2023/10/30 09:38:26 UTC

(ignite) branch master updated: IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict()

This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c8c919220d IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict()
8c8c919220d is described below

commit 8c8c919220dd910a6891dfd0795cef63b56974b6
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Mon Oct 30 12:38:13 2023 +0300

    IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict()
---
 .../CacheObjectCompressionConsumptionTest.java     | 26 +++++++++++++++++++---
 .../platform/client/ClientMessageParser.java       |  4 ++--
 .../cache/ClientCachePutAllConflictRequest.java    |  4 ++--
 .../cache/ClientCacheRemoveAllConflictRequest.java |  2 +-
 .../streamer/ClientDataStreamerAddDataRequest.java |  5 ++---
 .../client/streamer/ClientDataStreamerReader.java  |  7 +++---
 .../streamer/ClientDataStreamerStartRequest.java   |  5 ++---
 .../processors/platform/utils/PlatformUtils.java   |  8 +++++--
 8 files changed, 42 insertions(+), 19 deletions(-)

diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
index 38ce530e530..5aabe2baf2b 100644
--- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.transform;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.function.Function;
 import org.apache.commons.io.FileUtils;
 import org.apache.ignite.DataRegionMetrics;
@@ -34,8 +36,11 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T3;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.spi.communication.CommunicationSpi;
 import org.apache.ignite.spi.metric.LongMetric;
@@ -285,7 +290,7 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo
 
             Ignite prim = primaryNode(0, CACHE_NAME);
 
-            if (mode == ConsumptionTestMode.THIN_CLIENT) {
+            if (mode == ConsumptionTestMode.THIN_CLIENT || mode == ConsumptionTestMode.THIN_CLIENT_INTERNAL_API) {
                 String host = prim.configuration().getLocalHost();
                 int port = prim.configuration().getClientConnectorConfiguration().getPort();
 
@@ -296,7 +301,19 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo
                         Object key = keyGen.apply(i);
                         Object val = valGen.apply(i);
 
-                        cache.put(key, val);
+                        if (mode == ConsumptionTestMode.THIN_CLIENT)
+                            cache.put(key, val);
+                        else {
+                            assert mode == ConsumptionTestMode.THIN_CLIENT_INTERNAL_API;
+
+                            Map<Object, T3<Object, GridCacheVersion, Long>> data = new HashMap<>();
+
+                            GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 0);
+
+                            data.put(key, new T3<>(val, otherVer, 0L));
+
+                            ((TcpClientCache)cache).putAllConflict(data);
+                        }
 
                         assertEqualsArraysAware(cache.get(key), val);
                     }
@@ -335,7 +352,7 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo
                 clNet += reg.<LongMetric>findMetric(SENT_BYTES_METRIC_NAME).value();
                 clNet += reg.<LongMetric>findMetric(RECEIVED_BYTES_METRIC_NAME).value();
 
-                if (mode != ConsumptionTestMode.THIN_CLIENT)
+                if (mode != ConsumptionTestMode.THIN_CLIENT && mode != ConsumptionTestMode.THIN_CLIENT_INTERNAL_API)
                     assertEquals(0, clNet);
 
                 net += clNet;
@@ -406,6 +423,9 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo
         /** Thin client. */
         THIN_CLIENT,
 
+        /** Thin client uses internal API. */
+        THIN_CLIENT_INTERNAL_API,
+
         /** Node + Persistent. */
         PERSISTENT
     }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index dbd62bc0e28..22cc257e660 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -627,10 +627,10 @@ public class ClientMessageParser implements ClientListenerMessageParser {
                 return new ClientServiceGetDescriptorRequest(reader);
 
             case OP_DATA_STREAMER_START:
-                return new ClientDataStreamerStartRequest(reader);
+                return new ClientDataStreamerStartRequest(reader, ctx);
 
             case OP_DATA_STREAMER_ADD_DATA:
-                return new ClientDataStreamerAddDataRequest(reader);
+                return new ClientDataStreamerAddDataRequest(reader, ctx);
 
             case OP_ATOMIC_LONG_CREATE:
                 return new ClientAtomicLongCreateRequest(reader);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
index ac3182b751b..4701f654e66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
@@ -60,8 +60,8 @@ public class ClientCachePutAllConflictRequest extends ClientCacheDataRequest imp
         map = new LinkedHashMap<>(cnt);
 
         for (int i = 0; i < cnt; i++) {
-            KeyCacheObject key = readCacheObject(reader, true);
-            CacheObject val = readCacheObject(reader, false);
+            KeyCacheObject key = readCacheObject(reader, true, ctx);
+            CacheObject val = readCacheObject(reader, false, ctx);
             GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
             long expireTime = reader.readLong();
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
index c6e1a35f1d2..168585bb099 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
@@ -51,7 +51,7 @@ public class ClientCacheRemoveAllConflictRequest extends ClientCacheDataRequest
         map = new LinkedHashMap<>(cnt);
 
         for (int i = 0; i < cnt; i++) {
-            KeyCacheObject key = readCacheObject(reader, true);
+            KeyCacheObject key = readCacheObject(reader, true, null);
             GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
 
             map.put(key, ver);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java
index 8e17a51a7ea..9eaaec9e2e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.platform.client.streamer;
 
 import java.util.Collection;
-
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.processors.cache.CacheObject;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -48,12 +47,12 @@ public class ClientDataStreamerAddDataRequest extends ClientDataStreamerRequest
      *
      * @param reader Data reader.
      */
-    public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader) {
+    public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
         super(reader);
 
         streamerId = reader.readLong();
         flags = reader.readByte();
-        entries = ClientDataStreamerReader.read(reader);
+        entries = ClientDataStreamerReader.read(reader, ctx);
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
index 39a973df3c5..da1ea65bef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 
 import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
 
@@ -34,7 +35,7 @@ class ClientDataStreamerReader {
      * @param reader Data reader.
      * @return Streamer entry.
      */
-    public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader) {
+    public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
         int entriesCnt = reader.readInt();
 
         if (entriesCnt == 0)
@@ -43,8 +44,8 @@ class ClientDataStreamerReader {
         Collection<DataStreamerEntry> entries = new ArrayList<>(entriesCnt);
 
         for (int i = 0; i < entriesCnt; i++) {
-            entries.add(new DataStreamerEntry(readCacheObject(reader, true),
-                    readCacheObject(reader, false)));
+            entries.add(new DataStreamerEntry(readCacheObject(reader, true, ctx),
+                    readCacheObject(reader, false, ctx)));
         }
 
         return entries;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
index b8cd231aeaa..7c96f423e69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.processors.platform.client.streamer;
 
 import java.util.Collection;
-
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.internal.GridKernalContext;
@@ -73,7 +72,7 @@ public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest {
      *
      * @param reader Data reader.
      */
-    public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) {
+    public ClientDataStreamerStartRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
         super(reader);
 
         cacheId = reader.readInt();
@@ -82,7 +81,7 @@ public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest {
         perThreadBufferSize = reader.readInt();
         receiverObj = reader.readObjectDetached();
         receiverPlatform = receiverObj == null ? 0 : reader.readByte();
-        entries = ClientDataStreamerReader.read(reader);
+        entries = ClientDataStreamerReader.read(reader, ctx);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 8462a5d24fe..94b99943d89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
 import org.apache.ignite.internal.processors.platform.PlatformNativeException;
 import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl;
 import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
 import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
@@ -1373,8 +1374,9 @@ public class PlatformUtils {
      *
      * @param reader Reader.
      * @param isKey {@code True} if object is a key.
+     * @param ctx Client connection context.
      */
-    public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
+    public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey, ClientConnectionContext ctx) {
         BinaryInputStream in = reader.in();
 
         int pos0 = in.position();
@@ -1393,7 +1395,9 @@ public class PlatformUtils {
 
         byte[] objBytes = in.readByteArray(pos1 - pos0);
 
-        return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
+        return isKey ?
+            (T)new KeyCacheObjectImpl(obj, objBytes, -1) :
+            (T)new CacheObjectImpl(obj, ctx.kernalContext().transformer() == null ? objBytes : null);
     }
 
     /**