You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2022/08/04 07:40:07 UTC
[ignite] branch master updated: IGNITE-15000 Implemented data replication thin client operations (#9915)
This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 3acf3f1098d IGNITE-15000 Implemented data replication thin client operations (#9915)
3acf3f1098d is described below
commit 3acf3f1098d38ddea6d7f3d55c0d0ec2dafad17d
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Thu Aug 4 10:39:55 2022 +0300
IGNITE-15000 Implemented data replication thin client operations (#9915)
---
.../clients/AbstractClientCompatibilityTest.java | 3 +
.../clients/JavaThinCompatibilityTest.java | 35 +++++
.../internal/client/thin/ClientOperation.java | 6 +
.../client/thin/ProtocolBitmaskFeature.java | 5 +-
.../internal/client/thin/TcpClientCache.java | 98 ++++++++++++-
.../platform/client/ClientBitmaskFeature.java | 6 +-
.../platform/client/ClientMessageParser.java | 14 ++
.../cache/ClientCachePutAllConflictRequest.java | 85 +++++++++++
.../cache/ClientCacheRemoveAllConflictRequest.java | 72 +++++++++
.../platform/client/cache/ClientCacheRequest.java | 13 ++
.../client/streamer/ClientDataStreamerReader.java | 32 +---
.../processors/platform/utils/PlatformUtils.java | 33 +++++
.../org/apache/ignite/client/ReliabilityTest.java | 2 +-
.../client/thin/DataReplicationOperationsTest.java | 161 +++++++++++++++++++++
.../org/apache/ignite/client/ClientTestSuite.java | 4 +-
15 files changed, 534 insertions(+), 35 deletions(-)
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
index e6cb4954f54..4d04a10322c 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
@@ -72,6 +72,9 @@ public abstract class AbstractClientCompatibilityTest extends IgniteCompatibilit
/** Version 2.13.0. */
protected static final IgniteProductVersion VER_2_13_0 = IgniteProductVersion.fromString("2.13.0");
+ /** Version 2.14.0. */
+ protected static final IgniteProductVersion VER_2_14_0 = IgniteProductVersion.fromString("2.14.0");
+
/** Parameters. */
@Parameterized.Parameters(name = "Version {0}")
public static Iterable<Object[]> versions() {
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index 6f52705c802..7b725088f2f 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -51,8 +51,11 @@ import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.ClientConnectorConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.platform.PlatformType;
@@ -429,6 +432,9 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
testServicesWithCallerContextThrows();
}
}
+
+ if (clientVer.compareTo(VER_2_14_0) >= 0)
+ testDataReplicationOperations(serverVer.compareTo(VER_2_14_0) >= 0);
}
/** */
@@ -473,6 +479,35 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
}
}
+ /** @param supported {@code True} if feature supported. */
+ private void testDataReplicationOperations(boolean supported) {
+ X.println(">>>> Testing cache replication");
+
+ try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(ADDR))) {
+ TcpClientCache<Object, Object> cache = (TcpClientCache<Object, Object>)client
+ .getOrCreateCache("test-cache-replication");
+
+ Map<Object, T2<Object, GridCacheVersion>> puts = F.asMap(1, new T2<>(1, new GridCacheVersion(1, 1, 1, 2)));
+
+ Map<Object, GridCacheVersion> rmvs = F.asMap(1, new GridCacheVersion(1, 1, 1, 2));
+
+ if (supported) {
+ cache.putAllConflict(puts);
+
+ assertEquals(1, cache.get(1));
+
+ cache.removeAllConflict(rmvs);
+
+ assertFalse(cache.containsKey(1));
+ }
+ else {
+ assertThrowsWithCause(() -> cache.putAllConflict(puts), ClientFeatureNotSupportedByServerException.class);
+
+ assertThrowsWithCause(() -> cache.removeAllConflict(rmvs), ClientFeatureNotSupportedByServerException.class);
+ }
+ }
+ }
+
/** */
public static interface EchoServiceInterface {
/** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index e37403bf3aa..c3c6491d3fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -117,6 +117,12 @@ public enum ClientOperation {
/** Cache clear keys. */
CACHE_CLEAR_KEYS(1015),
+ /** Cache put all conflict. */
+ CACHE_PUT_ALL_CONFLICT(1022),
+
+ /** Cache remove all conflict. */
+ CACHE_REMOVE_ALL_CONFLICT(1023),
+
/** Cache partitions. */
CACHE_PARTITIONS(1101),
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index bf035f46ac6..0f437251562 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -59,7 +59,10 @@ public enum ProtocolBitmaskFeature {
SERVICE_INVOKE_CALLCTX(10),
/** Handle OP_HEARTBEAT and OP_GET_IDLE_TIMEOUT. */
- HEARTBEAT(11);
+ HEARTBEAT(11),
+
+ /** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
+ DATA_REPLICATION_OPERATIONS(12);
/** */
private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 99f9eeae1a1..4efa4e5183b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -42,11 +42,14 @@ import org.apache.ignite.client.ClientCache;
import org.apache.ignite.client.ClientCacheConfiguration;
import org.apache.ignite.client.ClientDisconnectListener;
import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
import org.apache.ignite.client.IgniteClientFuture;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
import org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
@@ -57,7 +60,7 @@ import static org.apache.ignite.internal.processors.platform.cache.expiry.Platfo
/**
* Implementation of {@link ClientCache} over TCP protocol.
*/
-class TcpClientCache<K, V> implements ClientCache<K, V> {
+public class TcpClientCache<K, V> implements ClientCache<K, V> {
/** "Keep binary" flag mask. */
private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
@@ -863,6 +866,54 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
U.closeQuiet(hnd);
}
+ /**
+ * Store DR data.
+ *
+ * @param drMap DR map.
+ */
+ public void putAllConflict(Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> drMap) throws ClientException {
+ A.notNull(drMap, "drMap");
+
+ ch.request(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
+ }
+
+ /**
+ * Store DR data asynchronously.
+ *
+ * @param drMap DR map.
+ * @return Future.
+ */
+ public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T2<? extends V, GridCacheVersion>> drMap)
+ throws ClientException {
+ A.notNull(drMap, "drMap");
+
+ return ch.requestAsync(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
+ }
+
+ /**
+ * Removes DR data.
+ *
+ * @param drMap DR map.
+ */
+ public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws ClientException {
+ A.notNull(drMap, "drMap");
+
+ ch.request(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
+ }
+
+ /**
+ * Removes DR data asynchronously.
+ *
+ * @param drMap DR map.
+ * @return Future.
+ */
+ public IgniteClientFuture<Void> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap)
+ throws ClientException {
+ A.notNull(drMap, "drMap");
+
+ return ch.requestAsync(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
+ }
+
/** Handle scan query. */
private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
@@ -1065,4 +1116,49 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
serDes.writeObject(out, e.getValue());
});
}
+
+ /** */
+ private void writePutAllConflict(
+ Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> map,
+ PayloadOutputChannel req
+ ) {
+ checkDataReplicationSupported(req.clientChannel().protocolCtx());
+
+ writeCacheInfo(req);
+
+ ClientUtils.collection(
+ map.entrySet(),
+ req.out(),
+ (out, e) -> {
+ serDes.writeObject(out, e.getKey());
+ serDes.writeObject(out, e.getValue().get1());
+ serDes.writeObject(out, e.getValue().get2());
+ });
+ }
+
+ /** */
+ private void writeRemoveAllConflict(Map<? extends K, GridCacheVersion> map, PayloadOutputChannel req) {
+ checkDataReplicationSupported(req.clientChannel().protocolCtx());
+
+ writeCacheInfo(req);
+
+ ClientUtils.collection(
+ map.entrySet(),
+ req.out(),
+ (out, e) -> {
+ serDes.writeObject(out, e.getKey());
+ serDes.writeObject(out, e.getValue());
+ });
+ }
+
+ /**
+ * Check that data replication operations is supported by server.
+ *
+ * @param protocolCtx Protocol context.
+ */
+ private void checkDataReplicationSupported(ProtocolContext protocolCtx)
+ throws ClientFeatureNotSupportedByServerException {
+ if (!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS))
+ throw new ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index ea243f39bc1..88a25fe7df3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.client;
import java.util.EnumSet;
import org.apache.ignite.client.ClientServices;
import org.apache.ignite.internal.ThinProtocolFeature;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
/**
* Defines supported features for thin client.
@@ -59,7 +60,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature {
SERVICE_INVOKE_CALLCTX(10),
/** Handle OP_HEARTBEAT and OP_GET_IDLE_TIMEOUT. */
- HEARTBEAT(11);
+ HEARTBEAT(11),
+
+ /** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
+ DATA_REPLICATION_OPERATIONS(12);
/** */
private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 15312fdf74d..0156e855bb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -56,11 +56,13 @@ import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGe
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheLocalPeekRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheNodePartitionsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePartitionsRequest;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutAllConflictRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutAllRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutIfAbsentRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryContinuousRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryNextPageRequest;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveAllConflictRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveAllRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveIfEqualsRequest;
import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveKeyRequest;
@@ -189,6 +191,12 @@ public class ClientMessageParser implements ClientListenerMessageParser {
/** */
private static final short OP_CACHE_LOCAL_PEEK = 1021;
+ /** */
+ private static final short OP_CACHE_PUT_ALL_CONFLICT = 1022;
+
+ /** */
+ private static final short OP_CACHE_REMOVE_ALL_CONFLICT = 1023;
+
/* Cache create / destroy, configuration. */
/** */
private static final short OP_CACHE_GET_NAMES = 1050;
@@ -522,6 +530,12 @@ public class ClientMessageParser implements ClientListenerMessageParser {
case OP_CACHE_REMOVE_ALL:
return new ClientCacheRemoveAllRequest(reader);
+ case OP_CACHE_PUT_ALL_CONFLICT:
+ return new ClientCachePutAllConflictRequest(reader, ctx);
+
+ case OP_CACHE_REMOVE_ALL_CONFLICT:
+ return new ClientCacheRemoveAllConflictRequest(reader);
+
case OP_CACHE_CREATE_WITH_NAME:
return new ClientCacheCreateWithNameRequest(reader);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
new file mode 100644
index 00000000000..288839c40be
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
+/**
+ * Client {@link TcpClientCache#putAllConflict(Map)} request.
+ */
+public class ClientCachePutAllConflictRequest extends ClientCacheDataRequest implements ClientTxAwareRequest {
+ /** */
+ private final Map<KeyCacheObject, GridCacheDrInfo> map;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ * @param ctx Connection context.
+ */
+ public ClientCachePutAllConflictRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
+ super(reader);
+
+ boolean expPlc = cachex(ctx).configuration().getExpiryPolicyFactory() != null;
+
+ int cnt = reader.readInt();
+
+ map = new LinkedHashMap<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ KeyCacheObject key = readCacheObject(reader, true);
+ CacheObject val = readCacheObject(reader, false);
+ GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
+
+ GridCacheDrInfo info = expPlc ?
+ new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE) :
+ new GridCacheDrInfo(val, ver);
+
+ map.put(key, info);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ try {
+ cachex(ctx).putAllConflict(map);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ return super.process(ctx);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
new file mode 100644
index 00000000000..c6e1a35f1d2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
+/**
+ * Client {@link TcpClientCache#removeAllConflict(Map)} request.
+ */
+public class ClientCacheRemoveAllConflictRequest extends ClientCacheDataRequest implements ClientTxAwareRequest {
+ /** */
+ private final Map<KeyCacheObject, GridCacheVersion> map;
+
+ /**
+ * Constructor.
+ *
+ * @param reader Reader.
+ */
+ public ClientCacheRemoveAllConflictRequest(BinaryReaderExImpl reader) {
+ super(reader);
+
+ int cnt = reader.readInt();
+
+ map = new LinkedHashMap<>(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ KeyCacheObject key = readCacheObject(reader, true);
+ GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
+
+ map.put(key, ver);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClientResponse process(ClientConnectionContext ctx) {
+ try {
+ cachex(ctx).removeAllConflict(map);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+
+ return super.process(ctx);
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
index cfb20d9e5ce..5881ab39a10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
@@ -21,6 +21,7 @@ import javax.cache.expiry.ExpiryPolicy;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryRawReader;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
import org.apache.ignite.internal.processors.platform.client.ClientRequest;
@@ -76,6 +77,18 @@ public class ClientCacheRequest extends ClientRequest {
return rawCache(ctx).withKeepBinary();
}
+ /**
+ * Gets the internal cache implementation, with binary mode enabled.
+ *
+ * @param ctx Kernal context.
+ * @return Cache.
+ */
+ protected IgniteInternalCache<?, ?> cachex(ClientConnectionContext ctx) {
+ String cacheName = cacheDescriptor(ctx).cacheName();
+
+ return ctx.kernalContext().grid().cachex(cacheName).keepBinary();
+ }
+
/**
* Gets a value indicating whether keepBinary flag is set in this request.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
index 25bdae272cb..39a973df3c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
@@ -19,14 +19,11 @@ package org.apache.ignite.internal.processors.platform.client.streamer;
import java.util.ArrayList;
import java.util.Collection;
-
import org.apache.ignite.internal.binary.BinaryReaderExImpl;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
/**
* Data streamer deserialization helpers.
*/
@@ -52,29 +49,4 @@ class ClientDataStreamerReader {
return entries;
}
-
- /**
- * Read cache object from the stream as raw bytes to avoid marshalling.
- */
- private static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
- BinaryInputStream in = reader.in();
-
- int pos0 = in.position();
-
- Object obj = reader.readObjectDetached();
-
- if (obj == null)
- return null;
-
- if (obj instanceof CacheObject)
- return (T)obj;
-
- int pos1 = in.position();
-
- in.position(pos0);
-
- byte[] objBytes = in.readByteArray(pos1 - pos0);
-
- return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
- }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 06d55f4a27d..8462a5d24fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -53,11 +53,16 @@ import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
import org.apache.ignite.internal.binary.BinaryRawReaderEx;
import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
import org.apache.ignite.internal.binary.BinarySchema;
import org.apache.ignite.internal.binary.BinarySchemaRegistry;
import org.apache.ignite.internal.binary.BinaryTypeImpl;
import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.processors.platform.PlatformContext;
import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
@@ -1363,6 +1368,34 @@ public class PlatformUtils {
return "";
}
+ /**
+ * Read cache object from the stream as raw bytes to avoid marshalling.
+ *
+ * @param reader Reader.
+ * @param isKey {@code True} if object is a key.
+ */
+ public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
+ BinaryInputStream in = reader.in();
+
+ int pos0 = in.position();
+
+ Object obj = reader.readObjectDetached();
+
+ if (obj == null)
+ return null;
+
+ if (obj instanceof CacheObject)
+ return (T)obj;
+
+ int pos1 = in.position();
+
+ in.position(pos0);
+
+ byte[] objBytes = in.readByteArray(pos1 - pos0);
+
+ return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
+ }
+
/**
* Private constructor.
*/
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 075fd146477..7e25d7b67e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -316,7 +316,7 @@ public class ReliabilityTest extends AbstractThinClientTest {
String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
- long expectedNullCount = 16;
+ long expectedNullCount = 18;
String msg = nullOps.size()
+ " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
new file mode 100644
index 00000000000..b68ca7eeddd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Data replication operations test.
+ */
+@RunWith(Parameterized.class)
+public class DataReplicationOperationsTest extends AbstractThinClientTest {
+ /** Keys count. */
+ private static final int KEYS_CNT = 10;
+
+ /** */
+ private static IgniteClient client;
+
+ /** */
+ private static TcpClientCache<Object, Object> cache;
+
+ /** */
+ private final GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 2);
+
+ /** {@code True} if operate with binary objects. */
+ @Parameterized.Parameter
+ public boolean binary;
+
+ /** @return Test parameters. */
+ @Parameterized.Parameters(name = "binary={0}")
+ public static Collection<Object[]> parameters() {
+ return cartesianProduct(F.asList(false, true));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ super.beforeTestsStarted();
+
+ startGrid();
+
+ client = startClient(grid());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ grid().destroyCaches(grid().cacheNames());
+
+ cache = (TcpClientCache<Object, Object>)client.createCache(DEFAULT_CACHE_NAME);
+
+ if (binary)
+ cache = (TcpClientCache<Object, Object>)cache.withKeepBinary();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ super.afterTestsStopped();
+
+ client.close();
+ }
+
+ /** */
+ @Test
+ public void testPutAllConflict() {
+ Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+
+ cache.putAllConflict(data);
+
+ data.forEach((key, val) -> assertEquals(val.get1(), cache.get(key)));
+ }
+
+ /** */
+ @Test
+ public void testRemoveAllConflict() {
+ for (int i = 0; i < KEYS_CNT; i++)
+ cache.put(new Person(i, "Person-" + i), new Person(i, "Person-" + i));
+
+ Map<Object, GridCacheVersion> map = new HashMap<>();
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ Person key = new Person(i, "Person-" + i);
+
+ map.put(binary ? client.binary().toBinary(key) : key, otherVer);
+ }
+
+ cache.removeAllConflict(map);
+
+ map.keySet().forEach(key -> assertFalse(cache.containsKey(key)));
+ }
+
+ /** @throws Exception If fails. */
+ @Test
+ public void testWithExpiryPolicy() throws Exception {
+ PlatformExpiryPolicy expPlc = new PlatformExpiryPolicy(1000, 1000, 1000);
+
+ ClientCacheConfiguration ccfgWithExpPlc = new ClientCacheConfiguration()
+ .setName("cache-with-expiry-policy")
+ .setExpiryPolicy(expPlc);
+
+ TcpClientCache<Object, Object> cache = (TcpClientCache<Object, Object>)client.getOrCreateCache(ccfgWithExpPlc);
+
+ TcpClientCache<Object, Object> cacheWithExpPlc = binary ?
+ (TcpClientCache<Object, Object>)cache.withKeepBinary() : cache;
+
+ Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+
+ cacheWithExpPlc.putAllConflict(data);
+
+ assertTrue(cacheWithExpPlc.containsKeys(data.keySet()));
+
+ assertTrue(waitForCondition(
+ () -> data.keySet().stream().noneMatch(cacheWithExpPlc::containsKey),
+ 2 * expPlc.getExpiryForCreation().getDurationAmount()
+ ));
+ }
+
+ /** */
+ private Map<Object, T2<Object, GridCacheVersion>> createPutAllData() {
+ Map<Object, T2<Object, GridCacheVersion>> map = new HashMap<>();
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ Person key = new Person(i, "Person-" + i);
+ Person val = new Person(i, "Person-" + i);
+
+ map.put(binary ? client.binary().toBinary(key) : key,
+ new T2<>(binary ? client.binary().toBinary(val) : val, otherVer));
+ }
+
+ return map;
+ }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index aeb99154e8f..693c09ea02e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.client.thin.CacheEntryListenersTest;
import org.apache.ignite.internal.client.thin.ClusterApiTest;
import org.apache.ignite.internal.client.thin.ClusterGroupTest;
import org.apache.ignite.internal.client.thin.ComputeTaskTest;
+import org.apache.ignite.internal.client.thin.DataReplicationOperationsTest;
import org.apache.ignite.internal.client.thin.IgniteSetTest;
import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
import org.apache.ignite.internal.client.thin.ReliableChannelTest;
@@ -73,7 +74,8 @@ import org.junit.runners.Suite;
OptimizedMarshallerClassesCachedTest.class,
AtomicLongTest.class,
BinaryConfigurationTest.class,
- IgniteSetTest.class
+ IgniteSetTest.class,
+ DataReplicationOperationsTest.class
})
public class ClientTestSuite {
// No-op.