You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/07/14 07:14:19 UTC

[ignite-extensions] branch master updated: IGNITE-15116 Test of SQL query replication added. (#70)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 41991eb  IGNITE-15116 Test of SQL query replication added. (#70)
41991eb is described below

commit 41991eb6a733d52d7f47c423a0142e3c12d5fb0e
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Jul 14 10:14:14 2021 +0300

    IGNITE-15116 Test of SQL query replication added. (#70)
---
 .../org/apache/ignite/cdc/CdcEventsApplier.java    |  9 ++-
 .../apache/ignite/cdc/AbstractReplicationTest.java | 72 +++++++++++++++++++---
 .../cdc/CdcIgniteToIgniteReplicationTest.java      |  4 +-
 .../ignite/cdc/kafka/CdcKafkaReplicationTest.java  |  6 +-
 4 files changed, 77 insertions(+), 14 deletions(-)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index a8fe3ff..9137f2a 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -85,7 +85,7 @@ public abstract class CdcEventsApplier {
                         // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
                         ignite().cache(cacheName);
 
-                        return ignite().cachex(cacheName);
+                        return ignite().cachex(cacheName).keepBinary();
                     }
                 }
 
@@ -105,7 +105,12 @@ public abstract class CdcEventsApplier {
             if (evt.value() != null) {
                 applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
 
-                CacheObject val = new CacheObjectImpl(evt.value(), null);
+                CacheObject val;
+
+                if (evt.value() instanceof CacheObject)
+                    val = (CacheObject)evt.value();
+                else
+                    val = new CacheObjectImpl(evt.value(), null);
 
                 updBatch.put(key, new GridCacheDrInfo(val,
                     new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 17468df..7e46dc3 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -24,10 +24,12 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
+import java.util.function.Function;
 import java.util.stream.IntStream;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -36,6 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -144,13 +147,19 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
                 }}));
 
         if (!cfg.isClientMode()) {
-            CacheVersionConflictResolverPluginProvider<?> cfgPlugin = new CacheVersionConflictResolverPluginProvider<>();
+            CacheVersionConflictResolverPluginProvider<?> cfgPlugin1 = new CacheVersionConflictResolverPluginProvider<>();
 
-            cfgPlugin.setClusterId(clusterId);
-            cfgPlugin.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
-            cfgPlugin.setConflictResolveField("reqId");
+            cfgPlugin1.setClusterId(clusterId);
+            cfgPlugin1.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
+            cfgPlugin1.setConflictResolveField("reqId");
 
-            cfg.setPluginProviders(cfgPlugin);
+            CacheVersionConflictResolverPluginProvider<?> cfgPlugin2 = new CacheVersionConflictResolverPluginProvider<>();
+
+            cfgPlugin2.setClusterId(clusterId);
+            cfgPlugin2.setCaches(new HashSet<>(Arrays.asList("T1")));
+            cfgPlugin2.setConflictResolveField("ID");
+
+            cfg.setPluginProviders(cfgPlugin1);
 
             cfg.setDataStorageConfiguration(new DataStorageConfiguration()
                 .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
@@ -223,7 +232,7 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
     /** Active/Passive mode means changes made only in one cluster. */
     @Test
     public void testActivePassiveReplication() throws Exception {
-        List<IgniteInternalFuture<?>> futs = startActivePassiveCdc();
+        List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
 
         try {
             IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE);
@@ -252,6 +261,50 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
         }
     }
 
+    /** Active/Passive mode means changes made only in one cluster. */
+    @Test
+    public void testActivePassiveSqlDataReplication() throws Exception {
+        String createTbl = "CREATE TABLE T1(ID BIGINT PRIMARY KEY, NAME VARCHAR) WITH \"CACHE_NAME=T1,VALUE_TYPE=T1Type\"";
+        String insertQry = "INSERT INTO T1 VALUES(?, ?)";
+        String deleteQry = "DELETE FROM T1";
+
+        executeSql(srcCluster[0], createTbl);
+        executeSql(destCluster[0], createTbl);
+
+        executeSql(destCluster[0], insertQry, -1, "Name-1");
+        executeSql(destCluster[0], deleteQry);
+
+        IntStream.range(0, KEYS_CNT).forEach(id -> executeSql(srcCluster[0], insertQry, id, "Name" + id));
+
+        List<IgniteInternalFuture<?>> futs = startActivePassiveCdc("T1");
+
+        try {
+            Function<Integer, GridAbsPredicate> waitForTblSz = expSz -> () -> {
+                long cnt = (Long)executeSql(destCluster[0], "SELECT COUNT(*) FROM T1").get(0).get(0);
+
+                return cnt == expSz;
+            };
+
+            assertTrue(waitForCondition(waitForTblSz.apply(KEYS_CNT), getTestTimeout()));
+
+
+            List<List<?>> data = executeSql(destCluster[0], "SELECT ID, NAME FROM T1 ORDER BY ID");
+
+            for (int i = 0; i < KEYS_CNT; i++) {
+                assertEquals((long)i, data.get(i).get(0));
+                assertEquals("Name" + i, data.get(i).get(1));
+            }
+
+            executeSql(srcCluster[0], deleteQry);
+
+            assertTrue(waitForCondition(waitForTblSz.apply(0), getTestTimeout()));
+        }
+        finally {
+            for (IgniteInternalFuture<?> fut : futs)
+                fut.cancel();
+        }
+    }
+
     /** Active/Active mode means changes made in both clusters. */
     @Test
     public void testActiveActiveReplication() throws Exception {
@@ -346,7 +399,12 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
     }
 
     /** */
-    protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc();
+    private List<List<?>> executeSql(IgniteEx node, String sqlText, Object... args) {
+        return node.context().query().querySqlFields(new SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
+    }
+
+    /** */
+    protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache);
 
     /** */
     protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 34082b5..6f87f9d 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -29,11 +29,11 @@ import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 /** */
 public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
     /** {@inheritDoc} */
-    @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
+    @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
         for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_PASSIVE_CACHE));
+            futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], cache));
 
         return futs;
     }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index 74e7719..c492300 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -80,15 +80,15 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
     }
 
     /** {@inheritDoc} */
-    @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
+    @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
         for (IgniteEx ex : srcCluster)
-            futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, ACTIVE_PASSIVE_CACHE));
+            futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, cache));
 
         for (int i = 0; i < destCluster.length; i++) {
             futs.add(kafkaToIgnite(
-                ACTIVE_PASSIVE_CACHE,
+                cache,
                 DFLT_TOPIC,
                 destClusterCliCfg[i],
                 i * (DFLT_PARTS / 2),