You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/07/14 07:14:19 UTC
[ignite-extensions] branch master updated: IGNITE-15116 Test of SQL
query replication added. (#70)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 41991eb IGNITE-15116 Test of SQL query replication added. (#70)
41991eb is described below
commit 41991eb6a733d52d7f47c423a0142e3c12d5fb0e
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Jul 14 10:14:14 2021 +0300
IGNITE-15116 Test of SQL query replication added. (#70)
---
.../org/apache/ignite/cdc/CdcEventsApplier.java | 9 ++-
.../apache/ignite/cdc/AbstractReplicationTest.java | 72 +++++++++++++++++++---
.../cdc/CdcIgniteToIgniteReplicationTest.java | 4 +-
.../ignite/cdc/kafka/CdcKafkaReplicationTest.java | 6 +-
4 files changed, 77 insertions(+), 14 deletions(-)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index a8fe3ff..9137f2a 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -85,7 +85,7 @@ public abstract class CdcEventsApplier {
// IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
ignite().cache(cacheName);
- return ignite().cachex(cacheName);
+ return ignite().cachex(cacheName).keepBinary();
}
}
@@ -105,7 +105,12 @@ public abstract class CdcEventsApplier {
if (evt.value() != null) {
applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
- CacheObject val = new CacheObjectImpl(evt.value(), null);
+ CacheObject val;
+
+ if (evt.value() instanceof CacheObject)
+ val = (CacheObject)evt.value();
+ else
+ val = new CacheObjectImpl(evt.value(), null);
updBatch.put(key, new GridCacheDrInfo(val,
new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 17468df..7e46dc3 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -24,10 +24,12 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
+import java.util.function.Function;
import java.util.stream.IntStream;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
@@ -36,6 +38,7 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
@@ -144,13 +147,19 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
}}));
if (!cfg.isClientMode()) {
- CacheVersionConflictResolverPluginProvider<?> cfgPlugin = new CacheVersionConflictResolverPluginProvider<>();
+ CacheVersionConflictResolverPluginProvider<?> cfgPlugin1 = new CacheVersionConflictResolverPluginProvider<>();
- cfgPlugin.setClusterId(clusterId);
- cfgPlugin.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
- cfgPlugin.setConflictResolveField("reqId");
+ cfgPlugin1.setClusterId(clusterId);
+ cfgPlugin1.setCaches(new HashSet<>(Arrays.asList(ACTIVE_PASSIVE_CACHE, ACTIVE_ACTIVE_CACHE)));
+ cfgPlugin1.setConflictResolveField("reqId");
- cfg.setPluginProviders(cfgPlugin);
+ CacheVersionConflictResolverPluginProvider<?> cfgPlugin2 = new CacheVersionConflictResolverPluginProvider<>();
+
+ cfgPlugin2.setClusterId(clusterId);
+ cfgPlugin2.setCaches(new HashSet<>(Arrays.asList("T1")));
+ cfgPlugin2.setConflictResolveField("ID");
+
+ cfg.setPluginProviders(cfgPlugin1);
cfg.setDataStorageConfiguration(new DataStorageConfiguration()
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
@@ -223,7 +232,7 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
/** Active/Passive mode means changes made only in one cluster. */
@Test
public void testActivePassiveReplication() throws Exception {
- List<IgniteInternalFuture<?>> futs = startActivePassiveCdc();
+ List<IgniteInternalFuture<?>> futs = startActivePassiveCdc(ACTIVE_PASSIVE_CACHE);
try {
IgniteCache<Integer, ConflictResolvableTestData> destCache = createCache(destCluster[0], ACTIVE_PASSIVE_CACHE);
@@ -252,6 +261,50 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
}
}
+ /** Active/Passive mode means changes made only in one cluster. */
+ @Test
+ public void testActivePassiveSqlDataReplication() throws Exception {
+ String createTbl = "CREATE TABLE T1(ID BIGINT PRIMARY KEY, NAME VARCHAR) WITH \"CACHE_NAME=T1,VALUE_TYPE=T1Type\"";
+ String insertQry = "INSERT INTO T1 VALUES(?, ?)";
+ String deleteQry = "DELETE FROM T1";
+
+ executeSql(srcCluster[0], createTbl);
+ executeSql(destCluster[0], createTbl);
+
+ executeSql(destCluster[0], insertQry, -1, "Name-1");
+ executeSql(destCluster[0], deleteQry);
+
+ IntStream.range(0, KEYS_CNT).forEach(id -> executeSql(srcCluster[0], insertQry, id, "Name" + id));
+
+ List<IgniteInternalFuture<?>> futs = startActivePassiveCdc("T1");
+
+ try {
+ Function<Integer, GridAbsPredicate> waitForTblSz = expSz -> () -> {
+ long cnt = (Long)executeSql(destCluster[0], "SELECT COUNT(*) FROM T1").get(0).get(0);
+
+ return cnt == expSz;
+ };
+
+ assertTrue(waitForCondition(waitForTblSz.apply(KEYS_CNT), getTestTimeout()));
+
+
+ List<List<?>> data = executeSql(destCluster[0], "SELECT ID, NAME FROM T1 ORDER BY ID");
+
+ for (int i = 0; i < KEYS_CNT; i++) {
+ assertEquals((long)i, data.get(i).get(0));
+ assertEquals("Name" + i, data.get(i).get(1));
+ }
+
+ executeSql(srcCluster[0], deleteQry);
+
+ assertTrue(waitForCondition(waitForTblSz.apply(0), getTestTimeout()));
+ }
+ finally {
+ for (IgniteInternalFuture<?> fut : futs)
+ fut.cancel();
+ }
+ }
+
/** Active/Active mode means changes made in both clusters. */
@Test
public void testActiveActiveReplication() throws Exception {
@@ -346,7 +399,12 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
}
/** */
- protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc();
+ private List<List<?>> executeSql(IgniteEx node, String sqlText, Object... args) {
+ return node.context().query().querySqlFields(new SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
+ }
+
+ /** */
+ protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache);
/** */
protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 34082b5..6f87f9d 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -29,11 +29,11 @@ import static org.apache.ignite.testframework.GridTestUtils.runAsync;
/** */
public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
/** {@inheritDoc} */
- @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
+ @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (int i = 0; i < srcCluster.length; i++)
- futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_PASSIVE_CACHE));
+ futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], cache));
return futs;
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index 74e7719..c492300 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -80,15 +80,15 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
}
/** {@inheritDoc} */
- @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
+ @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (IgniteEx ex : srcCluster)
- futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, ACTIVE_PASSIVE_CACHE));
+ futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, cache));
for (int i = 0; i < destCluster.length; i++) {
futs.add(kafkaToIgnite(
- ACTIVE_PASSIVE_CACHE,
+ cache,
DFLT_TOPIC,
destClusterCliCfg[i],
i * (DFLT_PARTS / 2),