You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2022/08/04 07:40:07 UTC

[ignite] branch master updated: IGNITE-15000 Implemented data replication thin client operations (#9915)

This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 3acf3f1098d IGNITE-15000 Implemented data replication thin client operations (#9915)
3acf3f1098d is described below

commit 3acf3f1098d38ddea6d7f3d55c0d0ec2dafad17d
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Thu Aug 4 10:39:55 2022 +0300

    IGNITE-15000 Implemented data replication thin client operations (#9915)
---
 .../clients/AbstractClientCompatibilityTest.java   |   3 +
 .../clients/JavaThinCompatibilityTest.java         |  35 +++++
 .../internal/client/thin/ClientOperation.java      |   6 +
 .../client/thin/ProtocolBitmaskFeature.java        |   5 +-
 .../internal/client/thin/TcpClientCache.java       |  98 ++++++++++++-
 .../platform/client/ClientBitmaskFeature.java      |   6 +-
 .../platform/client/ClientMessageParser.java       |  14 ++
 .../cache/ClientCachePutAllConflictRequest.java    |  85 +++++++++++
 .../cache/ClientCacheRemoveAllConflictRequest.java |  72 +++++++++
 .../platform/client/cache/ClientCacheRequest.java  |  13 ++
 .../client/streamer/ClientDataStreamerReader.java  |  32 +---
 .../processors/platform/utils/PlatformUtils.java   |  33 +++++
 .../org/apache/ignite/client/ReliabilityTest.java  |   2 +-
 .../client/thin/DataReplicationOperationsTest.java | 161 +++++++++++++++++++++
 .../org/apache/ignite/client/ClientTestSuite.java  |   4 +-
 15 files changed, 534 insertions(+), 35 deletions(-)

diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
index e6cb4954f54..4d04a10322c 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/AbstractClientCompatibilityTest.java
@@ -72,6 +72,9 @@ public abstract class AbstractClientCompatibilityTest extends IgniteCompatibilit
     /** Version 2.13.0. */
     protected static final IgniteProductVersion VER_2_13_0 = IgniteProductVersion.fromString("2.13.0");
 
+    /** Version 2.14.0. */
+    protected static final IgniteProductVersion VER_2_14_0 = IgniteProductVersion.fromString("2.14.0");
+
     /** Parameters. */
     @Parameterized.Parameters(name = "Version {0}")
     public static Iterable<Object[]> versions() {
diff --git a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
index 6f52705c802..7b725088f2f 100644
--- a/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
+++ b/modules/compatibility/src/test/java/org/apache/ignite/compatibility/clients/JavaThinCompatibilityTest.java
@@ -51,8 +51,11 @@ import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.ClientConnectorConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.ThinClientConfiguration;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.lang.IgniteProductVersion;
 import org.apache.ignite.platform.PlatformType;
@@ -429,6 +432,9 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
                 testServicesWithCallerContextThrows();
             }
         }
+
+        if (clientVer.compareTo(VER_2_14_0) >= 0)
+            testDataReplicationOperations(serverVer.compareTo(VER_2_14_0) >= 0);
     }
 
     /** */
@@ -473,6 +479,35 @@ public class JavaThinCompatibilityTest extends AbstractClientCompatibilityTest {
         }
     }
 
+    /** @param supported {@code True} if feature supported. */
+    private void testDataReplicationOperations(boolean supported) {
+        X.println(">>>> Testing cache replication");
+
+        try (IgniteClient client = Ignition.startClient(new ClientConfiguration().setAddresses(ADDR))) {
+            TcpClientCache<Object, Object> cache = (TcpClientCache<Object, Object>)client
+                .getOrCreateCache("test-cache-replication");
+
+            Map<Object, T2<Object, GridCacheVersion>> puts = F.asMap(1, new T2<>(1, new GridCacheVersion(1, 1, 1, 2)));
+
+            Map<Object, GridCacheVersion> rmvs = F.asMap(1, new GridCacheVersion(1, 1, 1, 2));
+
+            if (supported) {
+                cache.putAllConflict(puts);
+
+                assertEquals(1, cache.get(1));
+
+                cache.removeAllConflict(rmvs);
+
+                assertFalse(cache.containsKey(1));
+            }
+            else {
+                assertThrowsWithCause(() -> cache.putAllConflict(puts), ClientFeatureNotSupportedByServerException.class);
+
+                assertThrowsWithCause(() -> cache.removeAllConflict(rmvs), ClientFeatureNotSupportedByServerException.class);
+            }
+        }
+    }
+
     /** */
     public static interface EchoServiceInterface {
         /** */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
index e37403bf3aa..c3c6491d3fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ClientOperation.java
@@ -117,6 +117,12 @@ public enum ClientOperation {
     /** Cache clear keys. */
     CACHE_CLEAR_KEYS(1015),
 
+    /** Cache put all conflict. */
+    CACHE_PUT_ALL_CONFLICT(1022),
+
+    /** Cache remove all conflict. */
+    CACHE_REMOVE_ALL_CONFLICT(1023),
+
     /** Cache partitions. */
     CACHE_PARTITIONS(1101),
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
index bf035f46ac6..0f437251562 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/ProtocolBitmaskFeature.java
@@ -59,7 +59,10 @@ public enum ProtocolBitmaskFeature {
     SERVICE_INVOKE_CALLCTX(10),
 
     /** Handle OP_HEARTBEAT and OP_GET_IDLE_TIMEOUT. */
-    HEARTBEAT(11);
+    HEARTBEAT(11),
+
+    /** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
+    DATA_REPLICATION_OPERATIONS(12);
 
     /** */
     private static final EnumSet<ProtocolBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
index 99f9eeae1a1..4efa4e5183b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/thin/TcpClientCache.java
@@ -42,11 +42,14 @@ import org.apache.ignite.client.ClientCache;
 import org.apache.ignite.client.ClientCacheConfiguration;
 import org.apache.ignite.client.ClientDisconnectListener;
 import org.apache.ignite.client.ClientException;
+import org.apache.ignite.client.ClientFeatureNotSupportedByServerException;
 import org.apache.ignite.client.IgniteClientFuture;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
 import org.apache.ignite.internal.binary.streams.BinaryInputStream;
 import org.apache.ignite.internal.binary.streams.BinaryOutputStream;
 import org.apache.ignite.internal.client.thin.TcpClientTransactions.TcpClientTransaction;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.jetbrains.annotations.Nullable;
@@ -57,7 +60,7 @@ import static org.apache.ignite.internal.processors.platform.cache.expiry.Platfo
 /**
  * Implementation of {@link ClientCache} over TCP protocol.
  */
-class TcpClientCache<K, V> implements ClientCache<K, V> {
+public class TcpClientCache<K, V> implements ClientCache<K, V> {
     /** "Keep binary" flag mask. */
     private static final byte KEEP_BINARY_FLAG_MASK = 0x01;
 
@@ -863,6 +866,54 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
         U.closeQuiet(hnd);
     }
 
+    /**
+     * Store DR data.
+     *
+     * @param drMap DR map.
+     */
+    public void putAllConflict(Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> drMap) throws ClientException {
+        A.notNull(drMap, "drMap");
+
+        ch.request(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
+    }
+
+    /**
+     * Store DR data asynchronously.
+     *
+     * @param drMap DR map.
+     * @return Future.
+     */
+    public IgniteClientFuture<Void> putAllConflictAsync(Map<? extends K, T2<? extends V, GridCacheVersion>> drMap)
+        throws ClientException {
+        A.notNull(drMap, "drMap");
+
+        return ch.requestAsync(ClientOperation.CACHE_PUT_ALL_CONFLICT, req -> writePutAllConflict(drMap, req));
+    }
+
+    /**
+     * Removes DR data.
+     *
+     * @param drMap DR map.
+     */
+    public void removeAllConflict(Map<? extends K, GridCacheVersion> drMap) throws ClientException {
+        A.notNull(drMap, "drMap");
+
+        ch.request(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
+    }
+
+    /**
+     * Removes DR data asynchronously.
+     *
+     * @param drMap DR map.
+     * @return Future.
+     */
+    public IgniteClientFuture<Void> removeAllConflictAsync(Map<? extends K, GridCacheVersion> drMap)
+        throws ClientException {
+        A.notNull(drMap, "drMap");
+
+        return ch.requestAsync(ClientOperation.CACHE_REMOVE_ALL_CONFLICT, req -> writeRemoveAllConflict(drMap, req));
+    }
+
     /** Handle scan query. */
     private QueryCursor<Cache.Entry<K, V>> scanQuery(ScanQuery<K, V> qry) {
         Consumer<PayloadOutputChannel> qryWriter = payloadCh -> {
@@ -1065,4 +1116,49 @@ class TcpClientCache<K, V> implements ClientCache<K, V> {
                     serDes.writeObject(out, e.getValue());
                 });
     }
+
+    /** */
+    private void writePutAllConflict(
+        Map<? extends K, ? extends T2<? extends V, GridCacheVersion>> map,
+        PayloadOutputChannel req
+    ) {
+        checkDataReplicationSupported(req.clientChannel().protocolCtx());
+
+        writeCacheInfo(req);
+
+        ClientUtils.collection(
+            map.entrySet(),
+            req.out(),
+            (out, e) -> {
+                serDes.writeObject(out, e.getKey());
+                serDes.writeObject(out, e.getValue().get1());
+                serDes.writeObject(out, e.getValue().get2());
+            });
+    }
+
+    /** */
+    private void writeRemoveAllConflict(Map<? extends K, GridCacheVersion> map, PayloadOutputChannel req) {
+        checkDataReplicationSupported(req.clientChannel().protocolCtx());
+
+        writeCacheInfo(req);
+
+        ClientUtils.collection(
+            map.entrySet(),
+            req.out(),
+            (out, e) -> {
+                serDes.writeObject(out, e.getKey());
+                serDes.writeObject(out, e.getValue());
+            });
+    }
+
+    /**
+     * Check that data replication operations is supported by server.
+     *
+     * @param protocolCtx Protocol context.
+     */
+    private void checkDataReplicationSupported(ProtocolContext protocolCtx)
+        throws ClientFeatureNotSupportedByServerException {
+        if (!protocolCtx.isFeatureSupported(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS))
+            throw new ClientFeatureNotSupportedByServerException(ProtocolBitmaskFeature.DATA_REPLICATION_OPERATIONS);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
index ea243f39bc1..88a25fe7df3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientBitmaskFeature.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.processors.platform.client;
 import java.util.EnumSet;
 import org.apache.ignite.client.ClientServices;
 import org.apache.ignite.internal.ThinProtocolFeature;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
 
 /**
  * Defines supported features for thin client.
@@ -59,7 +60,10 @@ public enum ClientBitmaskFeature implements ThinProtocolFeature {
     SERVICE_INVOKE_CALLCTX(10),
 
     /** Handle OP_HEARTBEAT and OP_GET_IDLE_TIMEOUT. */
-    HEARTBEAT(11);
+    HEARTBEAT(11),
+
+    /** Data replication operations: {@link TcpClientCache#putAllConflict}, {@link TcpClientCache#removeAllConflict}. */
+    DATA_REPLICATION_OPERATIONS(12);
 
     /** */
     private static final EnumSet<ClientBitmaskFeature> ALL_FEATURES_AS_ENUM_SET =
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
index 15312fdf74d..0156e855bb4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/ClientMessageParser.java
@@ -56,11 +56,13 @@ import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheGe
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheLocalPeekRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheNodePartitionsRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePartitionsRequest;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutAllConflictRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutAllRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutIfAbsentRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCachePutRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryContinuousRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheQueryNextPageRequest;
+import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveAllConflictRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveAllRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveIfEqualsRequest;
 import org.apache.ignite.internal.processors.platform.client.cache.ClientCacheRemoveKeyRequest;
@@ -189,6 +191,12 @@ public class ClientMessageParser implements ClientListenerMessageParser {
     /** */
     private static final short OP_CACHE_LOCAL_PEEK = 1021;
 
+    /** */
+    private static final short OP_CACHE_PUT_ALL_CONFLICT = 1022;
+
+    /** */
+    private static final short OP_CACHE_REMOVE_ALL_CONFLICT = 1023;
+
     /* Cache create / destroy, configuration. */
     /** */
     private static final short OP_CACHE_GET_NAMES = 1050;
@@ -522,6 +530,12 @@ public class ClientMessageParser implements ClientListenerMessageParser {
             case OP_CACHE_REMOVE_ALL:
                 return new ClientCacheRemoveAllRequest(reader);
 
+            case OP_CACHE_PUT_ALL_CONFLICT:
+                return new ClientCachePutAllConflictRequest(reader, ctx);
+
+            case OP_CACHE_REMOVE_ALL_CONFLICT:
+                return new ClientCacheRemoveAllConflictRequest(reader);
+
             case OP_CACHE_CREATE_WITH_NAME:
                 return new ClientCacheCreateWithNameRequest(reader);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
new file mode 100644
index 00000000000..288839c40be
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCachePutAllConflictRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.EXPIRE_TIME_CALCULATE;
+import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_NOT_CHANGED;
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
+/**
+ * Client {@link TcpClientCache#putAllConflict(Map)} request.
+ */
+public class ClientCachePutAllConflictRequest extends ClientCacheDataRequest implements ClientTxAwareRequest {
+    /** */
+    private final Map<KeyCacheObject, GridCacheDrInfo> map;
+
+    /**
+     * Constructor.
+     *
+     * @param reader Reader.
+     * @param ctx Connection context.
+     */
+    public ClientCachePutAllConflictRequest(BinaryReaderExImpl reader, ClientConnectionContext ctx) {
+        super(reader);
+
+        boolean expPlc = cachex(ctx).configuration().getExpiryPolicyFactory() != null;
+
+        int cnt = reader.readInt();
+
+        map = new LinkedHashMap<>(cnt);
+
+        for (int i = 0; i < cnt; i++) {
+            KeyCacheObject key = readCacheObject(reader, true);
+            CacheObject val = readCacheObject(reader, false);
+            GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
+
+            GridCacheDrInfo info = expPlc ?
+                new GridCacheDrExpirationInfo(val, ver, TTL_NOT_CHANGED, EXPIRE_TIME_CALCULATE) :
+                new GridCacheDrInfo(val, ver);
+
+            map.put(key, info);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientResponse process(ClientConnectionContext ctx) {
+        try {
+            cachex(ctx).putAllConflict(map);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+
+        return super.process(ctx);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
new file mode 100644
index 00000000000..c6e1a35f1d2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRemoveAllConflictRequest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.client.cache;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
+import org.apache.ignite.internal.client.thin.TcpClientCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
+import org.apache.ignite.internal.processors.platform.client.ClientResponse;
+import org.apache.ignite.internal.processors.platform.client.tx.ClientTxAwareRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
+/**
+ * Client {@link TcpClientCache#removeAllConflict(Map)} request.
+ */
+public class ClientCacheRemoveAllConflictRequest extends ClientCacheDataRequest implements ClientTxAwareRequest {
+    /** */
+    private final Map<KeyCacheObject, GridCacheVersion> map;
+
+    /**
+     * Constructor.
+     *
+     * @param reader Reader.
+     */
+    public ClientCacheRemoveAllConflictRequest(BinaryReaderExImpl reader) {
+        super(reader);
+
+        int cnt = reader.readInt();
+
+        map = new LinkedHashMap<>(cnt);
+
+        for (int i = 0; i < cnt; i++) {
+            KeyCacheObject key = readCacheObject(reader, true);
+            GridCacheVersion ver = (GridCacheVersion)reader.readObjectDetached();
+
+            map.put(key, ver);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClientResponse process(ClientConnectionContext ctx) {
+        try {
+            cachex(ctx).removeAllConflict(map);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+
+        return super.process(ctx);
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
index cfb20d9e5ce..5881ab39a10 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/cache/ClientCacheRequest.java
@@ -21,6 +21,7 @@ import javax.cache.expiry.ExpiryPolicy;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.binary.BinaryRawReader;
 import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
 import org.apache.ignite.internal.processors.platform.client.ClientConnectionContext;
 import org.apache.ignite.internal.processors.platform.client.ClientRequest;
@@ -76,6 +77,18 @@ public class ClientCacheRequest extends ClientRequest {
         return rawCache(ctx).withKeepBinary();
     }
 
+    /**
+     * Gets the internal cache implementation, with binary mode enabled.
+     *
+     * @param ctx Kernal context.
+     * @return Cache.
+     */
+    protected IgniteInternalCache<?, ?> cachex(ClientConnectionContext ctx) {
+        String cacheName = cacheDescriptor(ctx).cacheName();
+
+        return ctx.kernalContext().grid().cachex(cacheName).keepBinary();
+    }
+
     /**
      * Gets a value indicating whether keepBinary flag is set in this request.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
index 25bdae272cb..39a973df3c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/client/streamer/ClientDataStreamerReader.java
@@ -19,14 +19,11 @@ package org.apache.ignite.internal.processors.platform.client.streamer;
 
 import java.util.ArrayList;
 import java.util.Collection;
-
 import org.apache.ignite.internal.binary.BinaryReaderExImpl;
-import org.apache.ignite.internal.binary.streams.BinaryInputStream;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
-import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry;
 
+import static org.apache.ignite.internal.processors.platform.utils.PlatformUtils.readCacheObject;
+
 /**
  * Data streamer deserialization helpers.
  */
@@ -52,29 +49,4 @@ class ClientDataStreamerReader {
 
         return entries;
     }
-
-    /**
-     * Read cache object from the stream as raw bytes to avoid marshalling.
-     */
-    private static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
-        BinaryInputStream in = reader.in();
-
-        int pos0 = in.position();
-
-        Object obj = reader.readObjectDetached();
-
-        if (obj == null)
-            return null;
-
-        if (obj instanceof CacheObject)
-            return (T)obj;
-
-        int pos1 = in.position();
-
-        in.position(pos0);
-
-        byte[] objBytes = in.readByteArray(pos1 - pos0);
-
-        return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
-    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 06d55f4a27d..8462a5d24fe 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -53,11 +53,16 @@ import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
 import org.apache.ignite.internal.binary.BinaryRawReaderEx;
 import org.apache.ignite.internal.binary.BinaryRawWriterEx;
+import org.apache.ignite.internal.binary.BinaryReaderExImpl;
 import org.apache.ignite.internal.binary.BinarySchema;
 import org.apache.ignite.internal.binary.BinarySchemaRegistry;
 import org.apache.ignite.internal.binary.BinaryTypeImpl;
 import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.binary.streams.BinaryInputStream;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.processors.platform.PlatformContext;
 import org.apache.ignite.internal.processors.platform.PlatformExtendedException;
@@ -1363,6 +1368,34 @@ public class PlatformUtils {
         return "";
     }
 
+    /**
+     * Read cache object from the stream as raw bytes to avoid marshalling.
+     *
+     * @param reader Reader.
+     * @param isKey {@code True} if object is a key.
+     */
+    public static <T extends CacheObject> T readCacheObject(BinaryReaderExImpl reader, boolean isKey) {
+        BinaryInputStream in = reader.in();
+
+        int pos0 = in.position();
+
+        Object obj = reader.readObjectDetached();
+
+        if (obj == null)
+            return null;
+
+        if (obj instanceof CacheObject)
+            return (T)obj;
+
+        int pos1 = in.position();
+
+        in.position(pos0);
+
+        byte[] objBytes = in.readByteArray(pos1 - pos0);
+
+        return isKey ? (T)new KeyCacheObjectImpl(obj, objBytes, -1) : (T)new CacheObjectImpl(obj, objBytes);
+    }
+
     /**
      * Private constructor.
      */
diff --git a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
index 075fd146477..7e25d7b67e4 100644
--- a/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/client/ReliabilityTest.java
@@ -316,7 +316,7 @@ public class ReliabilityTest extends AbstractThinClientTest {
 
         String nullOpsNames = nullOps.stream().map(Enum::name).collect(Collectors.joining(", "));
 
-        long expectedNullCount = 16;
+        long expectedNullCount = 18;
 
         String msg = nullOps.size()
                 + " operation codes do not have public equivalent. When adding new codes, update ClientOperationType too. Missing ops: "
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
new file mode 100644
index 00000000000..b68ca7eeddd
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/client/thin/DataReplicationOperationsTest.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.thin;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.client.ClientCacheConfiguration;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.Person;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.processors.platform.cache.expiry.PlatformExpiryPolicy;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.T2;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.testframework.GridTestUtils.cartesianProduct;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Data replication operations test.
+ */
+@RunWith(Parameterized.class)
+public class DataReplicationOperationsTest extends AbstractThinClientTest {
+    /** Keys count. */
+    private static final int KEYS_CNT = 10;
+
+    /** */
+    private static IgniteClient client;
+
+    /** */
+    private static TcpClientCache<Object, Object> cache;
+
+    /** */
+    private final GridCacheVersion otherVer = new GridCacheVersion(1, 1, 1, 2);
+
+    /** {@code True} if operate with binary objects. */
+    @Parameterized.Parameter
+    public boolean binary;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "binary={0}")
+    public static Collection<Object[]> parameters() {
+        return cartesianProduct(F.asList(false, true));
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid();
+
+        client = startClient(grid());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        grid().destroyCaches(grid().cacheNames());
+
+        cache = (TcpClientCache<Object, Object>)client.createCache(DEFAULT_CACHE_NAME);
+
+        if (binary)
+            cache = (TcpClientCache<Object, Object>)cache.withKeepBinary();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        super.afterTestsStopped();
+
+        client.close();
+    }
+
+    /** */
+    @Test
+    public void testPutAllConflict() {
+        Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+
+        cache.putAllConflict(data);
+
+        data.forEach((key, val) -> assertEquals(val.get1(), cache.get(key)));
+    }
+
+    /** */
+    @Test
+    public void testRemoveAllConflict() {
+        for (int i = 0; i < KEYS_CNT; i++)
+            cache.put(new Person(i, "Person-" + i), new Person(i, "Person-" + i));
+
+        Map<Object, GridCacheVersion> map = new HashMap<>();
+
+        for (int i = 0; i < KEYS_CNT; i++) {
+            Person key = new Person(i, "Person-" + i);
+
+            map.put(binary ? client.binary().toBinary(key) : key, otherVer);
+        }
+
+        cache.removeAllConflict(map);
+
+        map.keySet().forEach(key -> assertFalse(cache.containsKey(key)));
+    }
+
+    /** @throws Exception If fails. */
+    @Test
+    public void testWithExpiryPolicy() throws Exception {
+        PlatformExpiryPolicy expPlc = new PlatformExpiryPolicy(1000, 1000, 1000);
+
+        ClientCacheConfiguration ccfgWithExpPlc = new ClientCacheConfiguration()
+            .setName("cache-with-expiry-policy")
+            .setExpiryPolicy(expPlc);
+
+        TcpClientCache<Object, Object> cache = (TcpClientCache<Object, Object>)client.getOrCreateCache(ccfgWithExpPlc);
+
+        TcpClientCache<Object, Object> cacheWithExpPlc = binary ?
+            (TcpClientCache<Object, Object>)cache.withKeepBinary() : cache;
+
+        Map<Object, T2<Object, GridCacheVersion>> data = createPutAllData();
+
+        cacheWithExpPlc.putAllConflict(data);
+
+        assertTrue(cacheWithExpPlc.containsKeys(data.keySet()));
+
+        assertTrue(waitForCondition(
+            () -> data.keySet().stream().noneMatch(cacheWithExpPlc::containsKey),
+            2 * expPlc.getExpiryForCreation().getDurationAmount()
+        ));
+    }
+
+    /** */
+    private Map<Object, T2<Object, GridCacheVersion>> createPutAllData() {
+        Map<Object, T2<Object, GridCacheVersion>> map = new HashMap<>();
+
+        for (int i = 0; i < KEYS_CNT; i++) {
+            Person key = new Person(i, "Person-" + i);
+            Person val = new Person(i, "Person-" + i);
+
+            map.put(binary ? client.binary().toBinary(key) : key,
+                new T2<>(binary ? client.binary().toBinary(val) : val, otherVer));
+        }
+
+        return map;
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
index aeb99154e8f..693c09ea02e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/client/ClientTestSuite.java
@@ -23,6 +23,7 @@ import org.apache.ignite.internal.client.thin.CacheEntryListenersTest;
 import org.apache.ignite.internal.client.thin.ClusterApiTest;
 import org.apache.ignite.internal.client.thin.ClusterGroupTest;
 import org.apache.ignite.internal.client.thin.ComputeTaskTest;
+import org.apache.ignite.internal.client.thin.DataReplicationOperationsTest;
 import org.apache.ignite.internal.client.thin.IgniteSetTest;
 import org.apache.ignite.internal.client.thin.OptimizedMarshallerClassesCachedTest;
 import org.apache.ignite.internal.client.thin.ReliableChannelTest;
@@ -73,7 +74,8 @@ import org.junit.runners.Suite;
     OptimizedMarshallerClassesCachedTest.class,
     AtomicLongTest.class,
     BinaryConfigurationTest.class,
-    IgniteSetTest.class
+    IgniteSetTest.class,
+    DataReplicationOperationsTest.class
 })
 public class ClientTestSuite {
     // No-op.