You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2023/10/30 09:38:26 UTC
(ignite) branch master updated: IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict()
This is an automated email from the ASF dual-hosted git repository.
av pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8c8c919220d IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict()
8c8c919220d is described below
commit 8c8c919220dd910a6891dfd0795cef63b56974b6
Author: Anton Vinogradov <av...@apache.org>
AuthorDate: Mon Oct 30 12:38:13 2023 +0300
IGNITE-20746 Cache objects transformation never happen on TcpIgniteClient.putAllConflict()
---
.../CacheObjectCompressionConsumptionTest.java | 26 +++++++++++++++++++---
.../platform/client/ClientMessageParser.java | 4 ++--
.../cache/ClientCachePutAllConflictRequest.java | 4 ++--
.../cache/ClientCacheRemoveAllConflictRequest.java | 2 +-
.../streamer/ClientDataStreamerAddDataRequest.java | 5 ++---
.../client/streamer/ClientDataStreamerReader.java | 7 +++---
.../streamer/ClientDataStreamerStartRequest.java | 5 ++---
.../processors/platform/utils/PlatformUtils.java | 8 +++++--
8 files changed, 42 insertions(+), 19 deletions(-)
diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
index 38ce530e530..5aabe2baf2b 100644
--- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
+++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/transform/CacheObjectCompressionConsumptionTest.java
@@ -19,7 +19,9 @@ package org.apache.ignite.internal.processors.cache.transform;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.function.Function;
import org.apache.commons.io.FileUtils;
import org.apache.ignite.DataRegionMetrics;
@@ -34,8 +36,11 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.communication.CommunicationSpi;
import org.apache.ignite.spi.metric.LongMetric;
@@ -285,7 +290,7 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo
Ignite prim = primaryNode(0, CACHE_NAME);
- if (mode == ConsumptionTestMode.THIN_CLIENT) {
+ if (mode == ConsumptionTestMode.THIN_CLIENT || mode == ConsumptionTestMode.THIN_CLIENT_INTERNAL_API) {
String host = prim.configuration().getLocalHost();
int port = prim.configuration().getClientConnectorConfiguration().getPort();
@@ -296,7 +301,19 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo
Object key = keyGen.apply(i);
Object val = valGen.apply(i);
- cache.put(key, val);
+ if (mode == ConsumptionTestMode.THIN_CLIENT)
+ cache.put(key, val);
+ else {
+ assert mode == ConsumptionTestMode.THIN_CLIENT_INTERNAL_API;
+
+ Map<Object, T3<Object, GridCacheVersion, Long>> data = new HashMap<>();
+
+ GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 0);
+
+ data.put(key, new T3<>(val, otherVer, 0L));
+
+ ((TcpClientCache)cache).putAllConflict(data);
+ }
assertEqualsArraysAware(cache.get(key), val);
}
@@ -335,7 +352,7 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo
clNet += reg.<LongMetric>findMetric(SENT_BYTES_METRIC_NAME).value();
clNet += reg.<LongMetric>findMetric(RECEIVED_BYTES_METRIC_NAME).value();
- if (mode != ConsumptionTestMode.THIN_CLIENT)
+ if (mode != ConsumptionTestMode.THIN_CLIENT && mode != ConsumptionTestMode.THIN_CLIENT_INTERNAL_API)
assertEquals(0, clNet);
net += clNet;
@@ -406,6 +423,9 @@ public class CacheObjectCompressionConsumptionTest extends AbstractCacheObjectCo
/** Thin client. */
THIN_CLIENT,
+ /** Thin client uses internal API. */
+ THIN_CLIENT_INTERNAL_API,
+
/** Node + Persistent. */
PERSISTENT
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index dbd62bc0e28..22cc257e660 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -627,10 +627,10 @@ public class ClientMessageParser implements ClientListenerMessageParser {
return new ClientServiceGetDescriptorRequest(reader);
case OP_DATA_STREAMER_START:
- return new ClientDataStreamerStartRequest(reader);
+ return new ClientDataStreamerStartRequest(reader, ctx);
case OP_DATA_STREAMER_ADD_DATA:
- return new ClientDataStreamerAddDataRequest(reader);
+ return new ClientDataStreamerAddDataRequest(reader, ctx);
case OP_ATOMIC_LONG_CREATE:
return new ClientAtomicLongCreateRequest(reader);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
index ac3182b751b..4701f654e66 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
@@ -60,8 +60,8 @@ public class ClientCachePutAllConflictRequest extends ClientCacheDataRequest imp
map = new LinkedHashMap<>(cnt);
for (int i = 0; i < cnt; i++) {
- KeyCacheObject key = readCacheObject(reader, true);
- CacheObject val = readCacheObject(reader, false);
+ KeyCacheObject key = readCacheObject(reader, true, ctx);
+ CacheObject val = readCacheObject(reader, false, ctx);
GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
long expireTime = reader.readLong();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
index c6e1a35f1d2..168585bb099 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
@@ -51,7 +51,7 @@ public class ClientCacheRemoveAllConflictRequest extends ClientCacheDataRequest
map = new LinkedHashMap<>(cnt);
for (int i = 0; i < cnt; i++) {
- KeyCacheObject key = readCacheObject(reader, true);
+ KeyCacheObject key = readCacheObject(reader, true, null);
GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
map.put(key, ver);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java
index 8e17a51a7ea..9eaaec9e2e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerAddDataRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.platform.client.streamer;
import java.util.Collection;
-
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
@@ -48,12 +47,12 @@ public class ClientDataStreamerAddDataRequest extends ClientDataStreamerRequest
*
* @param reader Data reader.
*/
- public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader) {
+ public ClientDataStreamerAddDataRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
super(reader);
streamerId = reader.readLong();
flags = reader.readByte();
- entries = ClientDataStreamerReader.read(reader);
+ entries = ClientDataStreamerReader.read(reader, ctx);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
index 39a973df3c5..da1ea65bef8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
import java.util.Collection;
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
@@ -34,7 +35,7 @@ class ClientDataStreamerReader {
* @param reader Data reader.
* @return Streamer entry.
*/
- public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader) {
+ public static Collection<DataStreamerEntry> read(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
int entriesCnt = reader.readInt();
if (entriesCnt == 0)
@@ -43,8 +44,8 @@ class ClientDataStreamerReader {
Collection<DataStreamerEntry> entries = new ArrayList<>(entriesCnt);
for (int i = 0; i < entriesCnt; i++) {
- entries.add(new DataStreamerEntry(readCacheObject(reader, true),
- readCacheObject(reader, false)));
+ entries.add(new DataStreamerEntry(readCacheObject(reader, true, ctx),
+ readCacheObject(reader, false, ctx)));
}
return entries;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
index b8cd231aeaa..7c96f423e69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerStartRequest.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.processors.platform.client.streamer;
import java.util.Collection;
-
import org.apache.ignite.IgniteException;
import org.apache.ignite.binary.BinaryObject;
import org.apache.ignite.internal.GridKernalContext;
@@ -73,7 +72,7 @@ public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest {
*
* @param reader Data reader.
*/
- public ClientDataStreamerStartRequest(BinaryReaderExImpl reader) {
+ public ClientDataStreamerStartRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
super(reader);
cacheId = reader.readInt();
@@ -82,7 +81,7 @@ public class ClientDataStreamerStartRequest extends ClientDataStreamerRequest {
perThreadBufferSize = reader.readInt();
receiverObj = reader.readObjectDetached();
receiverPlatform = receiverObj == null ? 0 : reader.readByte();
- entries = ClientDataStreamerReader.read(reader);
+ entries = ClientDataStreamerReader.read(reader, ctx);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 8462a5d24fe..94b99943d89 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -68,6 +68,7 @@ import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
import org.apache.ignite.internal.processors.platform.PlatformNativeException;
import org.apache.ignite.internal.processors.platform.PlatformProcessor;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetServiceImpl;
import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
@@ -1373,8 +1374,9 @@ public class PlatformUtils {
*
* @param reader Reader.
* @param isKey {@code True} if object is a key.
+ * @param ctx Client connection context.
*/
- public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
+ public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey, ClientConnectionContext ctx) {
BinaryInputStream in = reader.in();
int pos0 = in.position();
@@ -1393,7 +1395,9 @@ public class PlatformUtils {
byte[] objBytes = in.readByteArray(pos1 - pos0);
- return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
+ return isKey ?
+ (T)new KeyCacheObjectImpl(obj, objBytes, -1) :
+ (T)new CacheObjectImpl(obj, ctx.kernalContext().transformer() == null ? objBytes : null);
}
/**