You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/01/11 15:05:17 UTC

[ignite] branch master updated: IGNITE-8732 partial fix for join: REPLICATED cache LEFT JOIN with PARTITIONED cache (#8637)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e16d080  IGNITE-8732 partial fix for join: REPLICATED cache LEFT JOIN with PARTITIONED cache (#8637)
e16d080 is described below

commit e16d0802d7084e478430e1f379bd0f7af2690b7a
Author: Stanislav Lukyanov <sl...@gridgain.com>
AuthorDate: Mon Jan 11 18:04:55 2021 +0300

    IGNITE-8732 partial fix for join: REPLICATED cache LEFT JOIN with PARTITIONED cache (#8637)
---
 .../processors/cache/query/GridCacheSqlQuery.java  | 25 ++++++-
 .../spi/indexing/IndexingQueryFilterImpl.java      | 34 +++++++++-
 .../cache/query/GridCacheTwoStepQuery.java         | 15 ++++-
 .../processors/query/h2/IgniteH2Indexing.java      | 15 ++++-
 .../query/h2/sql/GridSqlQuerySplitter.java         |  5 +-
 .../processors/query/h2/sql/SplitterUtils.java     | 29 +++++++++
 .../query/h2/twostep/GridMapQueryExecutor.java     | 14 ++--
 .../query/h2/twostep/GridReduceQueryExecutor.java  |  2 +-
 .../query/h2/twostep/msg/GridH2QueryRequest.java   | 11 ++--
 ...gniteCacheJoinPartitionedAndReplicatedTest.java | 64 ++----------------
 .../ReplicatedSqlCustomPartitionsTest.java         | 76 ++++++++++++++++++++++
 .../apache/ignite/sqltests/ReplicatedSqlTest.java  | 42 +-----------
 .../IgniteBinaryCacheQueryTestSuite.java           |  2 +
 13 files changed, 221 insertions(+), 113 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
index fdfed2b..9a837bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheSqlQuery.java
@@ -79,6 +79,11 @@ public class GridCacheSqlQuery implements Message {
     @GridDirectTransient
     private transient boolean hasSubQries;
 
+    /** Flag indicating that the query contains an OUTER JOIN from REPLICATED to PARTITIONED. */
+    @GridToStringInclude
+    @GridDirectTransient
+    private transient boolean hasOuterJoinReplicatedPartitioned;
+
     /**
      * For {@link Message}.
      */
@@ -359,7 +364,7 @@ public class GridCacheSqlQuery implements Message {
     }
 
     /**
-     * @param hasSubQries Flag indicating that query contains sub-queries.
+     * @param hasSubQries Flag indicating that the query contains sub-queries.
      *
      * @return {@code this}.
      */
@@ -368,4 +373,22 @@ public class GridCacheSqlQuery implements Message {
 
         return this;
     }
+
+    /**
+     * @return {@code true} if the query contains an OUTER JOIN from REPLICATED to PARTITIONED.
+     */
+    public boolean hasOuterJoinReplicatedPartitioned() {
+        return hasOuterJoinReplicatedPartitioned;
+    }
+
+    /**
+     * @param hasOuterJoinReplicatedPartitioned Flag indicating that the query contains an OUTER JOIN from REPLICATED to PARTITIONED.
+     *
+     * @return {@code this}.
+     */
+    public GridCacheSqlQuery hasOuterJoinReplicatedPartitioned(boolean hasOuterJoinReplicatedPartitioned) {
+        this.hasOuterJoinReplicatedPartitioned = hasOuterJoinReplicatedPartitioned;
+
+        return this;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java
index c1e03cd..54fa554 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/indexing/IndexingQueryFilterImpl.java
@@ -38,14 +38,30 @@ public class IndexingQueryFilterImpl implements IndexingQueryFilter {
     private final HashSet<Integer> parts;
 
     /**
+     * Treat replicated as partitioned.
+     * This was introduced as a partial solution for "[IGNITE-8732] SQL: REPLICATED cache cannot be left-joined
+     * to PARTITIONED".
+     *
+     * If this flag is set, only primary partitions of the REPLICATED caches will be scanned (same as with
+     * PARTITIONED caches).
+     *
+     * This flag requires the REPLICATED and PARTITIONED to have the same number of partitions and affinity,
+     * and requires the JOIN to be on an affinity key of both caches.
+     *
+     * This flag is incompatible with distributed joins.
+     */
+    private final boolean treatReplicatedAsPartitioned;
+
+    /**
      * Constructor.
      *
      * @param ctx Kernal context.
      * @param topVer Topology version.
      * @param partsArr Partitions array.
+     * @param treatReplicatedAsPartitioned If true, only primary partitions of replicated caches will be used.
      */
     public IndexingQueryFilterImpl(GridKernalContext ctx, @Nullable AffinityTopologyVersion topVer,
-        @Nullable int[] partsArr) {
+        @Nullable int[] partsArr, boolean treatReplicatedAsPartitioned) {
         this.ctx = ctx;
 
         this.topVer = topVer != null ? topVer : AffinityTopologyVersion.NONE;
@@ -58,6 +74,20 @@ public class IndexingQueryFilterImpl implements IndexingQueryFilter {
             for (int part : partsArr)
                 parts.add(part);
         }
+
+        this.treatReplicatedAsPartitioned = treatReplicatedAsPartitioned;
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param ctx Kernal context.
+     * @param topVer Topology version.
+     * @param partsArr Partitions array.
+     */
+    public IndexingQueryFilterImpl(GridKernalContext ctx, @Nullable AffinityTopologyVersion topVer,
+            @Nullable int[] partsArr) {
+        this(ctx, topVer, partsArr, false);
     }
 
     /** {@inheritDoc} */
@@ -65,7 +95,7 @@ public class IndexingQueryFilterImpl implements IndexingQueryFilter {
         final GridCacheAdapter<Object, Object> cache = ctx.cache().internalCache(cacheName);
 
         // REPLICATED -> nothing to filter (explicit partitions are not supported).
-        if (cache.context().isReplicated())
+        if (cache.context().isReplicated() && !treatReplicatedAsPartitioned)
             return null;
 
         return new IndexingQueryCacheFilter(cache.context().affinity(), parts, topVer,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
index c4f4143..85a50b4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheTwoStepQuery.java
@@ -69,8 +69,10 @@ public class GridCacheTwoStepQuery {
     /** Number of positional arguments in the sql. */
     private final int paramsCnt;
 
+    /** True if need to treat replicated as partitioned (for outer joins). */
+    private final boolean treatReplicatedAsPartitioned;
+
     /**
-     *
      * @param originalSql Original SQL.
      * @param paramsCnt Parameters count.
      * @param tbls Tables.
@@ -98,7 +100,8 @@ public class GridCacheTwoStepQuery {
         PartitionResult derivedPartitions,
         List<Integer> cacheIds,
         boolean mvccEnabled,
-        boolean locSplit
+        boolean locSplit,
+        boolean treatReplicatedAsPartitioned
     ) {
         assert !F.isEmpty(mapQrys);
 
@@ -115,6 +118,7 @@ public class GridCacheTwoStepQuery {
         this.locSplit = locSplit;
         this.mapQrys = mapQrys;
         this.replicatedOnly = replicatedOnly;
+        this.treatReplicatedAsPartitioned = treatReplicatedAsPartitioned;
     }
 
     /**
@@ -233,6 +237,13 @@ public class GridCacheTwoStepQuery {
         return paramsCnt;
     }
 
+    /**
+     * @return {@code true} if need to treat replicated as partitioned (for outer joins).
+     */
+    public boolean treatReplicatedAsPartitioned() {
+        return treatReplicatedAsPartitioned;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheTwoStepQuery.class, this);
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index f443f6d..1f7a59a 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -2539,7 +2539,20 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     /** {@inheritDoc} */
     @Override public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer,
         @Nullable final int[] parts) {
-        return new IndexingQueryFilterImpl(ctx, topVer, parts);
+        return backupFilter(topVer, parts, false);
+    }
+
+    /**
+     * Returns backup filter.
+     *
+     * @param topVer Topology version.
+     * @param parts Partitions.
+     * @param treatReplicatedAsPartitioned true if need to treat replicated as partitioned (for outer joins).
+     * @return Backup filter.
+     */
+    public IndexingQueryFilter backupFilter(@Nullable final AffinityTopologyVersion topVer, @Nullable final int[] parts,
+            boolean treatReplicatedAsPartitioned) {
+        return new IndexingQueryFilterImpl(ctx, topVer, parts, treatReplicatedAsPartitioned);
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
index 135979b..beadf28 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/GridSqlQuerySplitter.java
@@ -310,6 +310,7 @@ public class GridSqlQuerySplitter {
         List<Integer> cacheIds = H2Utils.collectCacheIds(idx, null, splitter.tbls);
         boolean mvccEnabled = H2Utils.collectMvccEnabled(idx, cacheIds);
         boolean replicatedOnly = splitter.mapSqlQrys.stream().noneMatch(GridCacheSqlQuery::isPartitioned);
+        boolean treatReplicatedAsPartitioned = splitter.mapSqlQrys.stream().anyMatch(GridCacheSqlQuery::hasOuterJoinReplicatedPartitioned);
 
         H2Utils.checkQuery(idx, cacheIds, splitter.tbls);
 
@@ -327,7 +328,8 @@ public class GridSqlQuerySplitter {
             splitter.extractor.mergeMapQueries(splitter.mapSqlQrys),
             cacheIds,
             mvccEnabled,
-            locSplit
+            locSplit,
+            treatReplicatedAsPartitioned
         );
     }
 
@@ -1265,6 +1267,7 @@ public class GridSqlQuerySplitter {
         map.sortColumns(mapQry.sort());
         map.partitioned(SplitterUtils.hasPartitionedTables(mapQry));
         map.hasSubQueries(SplitterUtils.hasSubQueries(mapQry));
+        map.hasOuterJoinReplicatedPartitioned(SplitterUtils.hasOuterJoinReplicatedPartitioned(mapQry.from()));
 
         if (map.isPartitioned() && canExtractPartitions)
             map.derivedPartitions(extractor.extract(mapQry));
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterUtils.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterUtils.java
index 3fcfcbd..fe47c68 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterUtils.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/sql/SplitterUtils.java
@@ -165,6 +165,35 @@ public class SplitterUtils {
     }
 
     /**
+     * Checks whether the expression has an OUTER JOIN from replicated to partitioned.
+     *
+     * This is used to infer the `treatReplicatedAsPartitioned` flag
+     * to eventually pass it to {@link org.apache.ignite.spi.indexing.IndexingQueryFilterImpl}.
+     *
+     * @param from FROM expression.
+     * @return {@code true} if the expression has an OUTER JOIN from replicated to partitioned.
+     */
+    public static boolean hasOuterJoinReplicatedPartitioned(GridSqlAst from) {
+        boolean isRightPartitioned = false;
+        while (from instanceof GridSqlJoin) {
+            GridSqlJoin join = (GridSqlJoin)from;
+
+            assert !(join.rightTable() instanceof GridSqlJoin);
+
+            isRightPartitioned = isRightPartitioned || hasPartitionedTables(join.rightTable());
+
+            if (join.isLeftOuter()) {
+                boolean isLeftPartitioned = hasPartitionedTables(join.leftTable());
+                return !isLeftPartitioned && isRightPartitioned;
+            }
+
+            from = join.leftTable();
+        }
+
+        return false;
+    }
+
+    /**
      * @param ast Reduce query AST.
      * @param rdcQry Reduce query string.
      */
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index aa79094..7d0d4ee 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -214,6 +214,7 @@ public class GridMapQueryExecutor {
         boolean explain = req.isFlagSet(GridH2QueryRequest.FLAG_EXPLAIN);
         boolean replicated = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED);
         final boolean lazy = req.isFlagSet(GridH2QueryRequest.FLAG_LAZY);
+        boolean treatReplicatedAsPartitioned = req.isFlagSet(GridH2QueryRequest.FLAG_REPLICATED_AS_PARTITIONED);
 
         Boolean dataPageScanEnabled = req.isDataPageScanEnabled();
 
@@ -255,7 +256,9 @@ public class GridMapQueryExecutor {
                             params,
                             lazy,
                             req.mvccSnapshot(),
-                            dataPageScanEnabled);
+                            dataPageScanEnabled,
+                            treatReplicatedAsPartitioned
+                        );
 
                         return null;
                     }
@@ -280,7 +283,9 @@ public class GridMapQueryExecutor {
             params,
             lazy,
             req.mvccSnapshot(),
-            dataPageScanEnabled);
+            dataPageScanEnabled,
+            treatReplicatedAsPartitioned
+        );
     }
 
     /**
@@ -321,7 +326,8 @@ public class GridMapQueryExecutor {
         final Object[] params,
         boolean lazy,
         @Nullable final MvccSnapshot mvccSnapshot,
-        Boolean dataPageScanEnabled
+        Boolean dataPageScanEnabled,
+        boolean treatReplicatedAsPartitioned
     ) {
         boolean performanceStatsEnabled = ctx.performanceStatistics().enabled();
 
@@ -379,7 +385,7 @@ public class GridMapQueryExecutor {
 
             qctx = new QueryContext(
                 segmentId,
-                h2.backupFilter(topVer, parts),
+                h2.backupFilter(topVer, parts, treatReplicatedAsPartitioned),
                 distributedJoinCtx,
                 mvccSnapshot,
                 reserved,
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index c248330..2cab5d0 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -775,7 +775,7 @@ public class GridReduceQueryExecutor {
 
         return GridH2QueryRequest.queryFlags(qry.distributedJoins(),
             enforceJoinOrder, lazy, qry.isReplicatedOnly(),
-            qry.explain(), dataPageScanEnabled);
+            qry.explain(), dataPageScanEnabled, qry.treatReplicatedAsPartitioned());
     }
 
     /**
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
index a662495..561cf18 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/msg/GridH2QueryRequest.java
@@ -68,10 +68,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
     public static final int FLAG_ENFORCE_JOIN_ORDER = 1 << 1;
 
     /**
-     * Unused. Keep for backward compatibility.
+     * Whether to treat replicated as partitioned (for outer joins).
      */
-    @SuppressWarnings("unused")
-    public static final int FLAG_UNUSED = 1 << 2;
+    public static final int FLAG_REPLICATED_AS_PARTITIONED = 1 << 2;
 
     /**
      * If it is an EXPLAIN command.
@@ -479,7 +478,8 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
         boolean lazy,
         boolean replicatedOnly,
         boolean explain,
-        Boolean dataPageScanEnabled) {
+        Boolean dataPageScanEnabled,
+        boolean treatReplicatedAsPartitioned) {
         int flags = enforceJoinOrder ? FLAG_ENFORCE_JOIN_ORDER : 0;
 
         // Distributed joins flag is set if it is either reald
@@ -497,6 +497,9 @@ public class GridH2QueryRequest implements Message, GridCacheQueryMarshallable {
 
         flags = setDataPageScanEnabled(flags, dataPageScanEnabled);
 
+        if (treatReplicatedAsPartitioned)
+            flags |= FLAG_REPLICATED_AS_PARTITIONED;
+
         return flags;
     }
 
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
index 2a73dad..c607e64 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheJoinPartitionedAndReplicatedTest.java
@@ -20,20 +20,17 @@ package org.apache.ignite.internal.processors.cache;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
-import javax.cache.CacheException;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.cache.QueryEntity;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
 import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
@@ -44,7 +41,6 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
 /**
  *
  */
-@Ignore("https://issues.apache.org/jira/browse/IGNITE-5016")
 public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstractTest {
     /** */
     private static final String PERSON_CACHE = "person";
@@ -55,6 +51,9 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
     /** */
     private static final String ORG_CACHE_REPLICATED = "orgRepl";
 
+    /** */
+    private static final int NUMBER_OF_PARTITIONS = 32;
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
@@ -125,6 +124,7 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
         ccfg.setWriteSynchronizationMode(FULL_SYNC);
         ccfg.setAtomicityMode(ATOMIC);
         ccfg.setBackups(1);
+        ccfg.setAffinity(new RendezvousAffinityFunction(false, NUMBER_OF_PARTITIONS));
 
         return ccfg;
     }
@@ -156,9 +156,8 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-5016")
     @Test
-    public void testJoin() throws Exception {
+    public void testJoin() {
         Ignite client = grid(2);
 
         IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
@@ -208,57 +207,6 @@ public class IgniteCacheJoinPartitionedAndReplicatedTest extends GridCommonAbstr
     }
 
     /**
-     */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-5016")
-    @Test
-    public void testReplicatedToPartitionedLeftJoin() {
-        Ignite client = grid(2);
-
-        IgniteCache<Object, Object> personCache = client.cache(PERSON_CACHE);
-        IgniteCache<Object, Object> orgCache = client.cache(ORG_CACHE);
-        IgniteCache<Object, Object> orgCacheRepl = client.cache(ORG_CACHE_REPLICATED);
-
-        List<Integer> keys = primaryKeys(ignite(0).cache(PERSON_CACHE), 3, 200_000);
-
-        orgCache.put(keys.get(0), new Organization(0, "org1"));
-        orgCacheRepl.put(keys.get(0), new Organization(0, "org1"));
-        personCache.put(keys.get(1), new Person(0, "p1"));
-        personCache.put(keys.get(2), new Person(0, "p2"));
-
-        checkQuery("select o.name, p._key, p.name " +
-            "from \"person\".Person p left join \"org\".Organization o " +
-            "on (p.orgId = o.id)", orgCache, 2);
-
-        checkQuery("select o.name, p._key, p.name " +
-            "from \"org\".Organization o right join \"person\".Person p " +
-            "on (p.orgId = o.id)", orgCache, 2);
-
-        checkQuery("select o.name, p._key, p.name " +
-            "from \"person\".Person p left join \"org\".Organization o " +
-            "on (p.orgId = o.id)", personCache, 2);
-
-        checkQuery("select o.name, p._key, p.name " +
-            "from \"org\".Organization o right join \"person\".Person p " +
-            "on (p.orgId = o.id)", personCache, 2);
-    }
-
-    /**
-     * @param sql SQL.
-     * @param cache Cache.
-     */
-    private void checkQueryFails(final String sql, final IgniteCache<Object, Object> cache) {
-        GridTestUtils.assertThrows(log, new Callable<Void>() {
-            @Override public Void call() throws Exception {
-                SqlFieldsQuery qry = new SqlFieldsQuery(sql);
-
-                cache.query(qry).getAll();
-
-                return null;
-            }
-        }, CacheException.class, null);
-    }
-
-    /**
      * @param sql SQL.
      * @param cache Cache.
      * @param expSize Expected results size.
diff --git a/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlCustomPartitionsTest.java b/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlCustomPartitionsTest.java
new file mode 100644
index 0000000..0c8ab2b
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlCustomPartitionsTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.sqltests;
+
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.junit.Test;
+
+/**
+ * Includes all base sql test plus tests that make sense in replicated mode with a non-default number of partitions.
+ */
+public class ReplicatedSqlCustomPartitionsTest extends ReplicatedSqlTest {
+    /** Test partitions count. */
+    private static final int NUM_OF_PARTITIONS = 509;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setCacheConfiguration(
+                new CacheConfiguration("partitioned" + NUM_OF_PARTITIONS + "*")
+                    .setAffinity(new RendezvousAffinityFunction(false, NUM_OF_PARTITIONS)),
+                new CacheConfiguration("replicated" + NUM_OF_PARTITIONS + "*")
+                    .setCacheMode(CacheMode.REPLICATED)
+                    .setAffinity(new RendezvousAffinityFunction(false, NUM_OF_PARTITIONS))
+            );
+    }
+
+    /**
+     * Create and fill common tables in replicated mode.
+     * Also create additional department table in partitioned mode to
+     * test mixed partitioned/replicated scenarios.
+     */
+    @Override protected void setupData() {
+        createTables("template=replicated" + NUM_OF_PARTITIONS);
+
+        fillCommonData();
+
+        createDepartmentTable(DEP_PART_TAB, "template=partitioned" + NUM_OF_PARTITIONS);
+
+        fillDepartmentTable(DEP_PART_TAB);
+    }
+
+    /**
+     * Check LEFT JOIN with collocated data of replicated and partitioned tables.
+     * This test relies on having the same number of partitions in replicated and partitioned caches
+     */
+    @Test
+    public void testLeftJoinReplicatedPartitioned() {
+        checkLeftJoinEmployeeDepartment(DEP_PART_TAB);
+    }
+
+    /**
+     * Check RIGHT JOIN with collocated data of partitioned and replicated tables.
+     */
+    @Test
+    public void testRightJoinPartitionedReplicated() {
+        checkRightJoinDepartmentEmployee(DEP_PART_TAB);
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java b/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
index 09a6f85..771d121 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/sqltests/ReplicatedSqlTest.java
@@ -27,7 +27,7 @@ import org.junit.Test;
  */
 public class ReplicatedSqlTest extends BaseSqlTest {
     /** Name of the department table created in partitioned mode. */
-    private String DEP_PART_TAB = "DepartmentPart";
+    static final String DEP_PART_TAB = "DepartmentPart";
 
     /**
      * Create and fill common tables in replicated mode.
@@ -39,9 +39,9 @@ public class ReplicatedSqlTest extends BaseSqlTest {
 
         fillCommonData();
 
-        createDepartmentTable("DepartmentPart", "template=partitioned");
+        createDepartmentTable(DEP_PART_TAB, "template=partitioned");
 
-        fillDepartmentTable("DepartmentPart");
+        fillDepartmentTable(DEP_PART_TAB);
     }
 
     /**
@@ -149,15 +149,6 @@ public class ReplicatedSqlTest extends BaseSqlTest {
     }
 
     /**
-     * Checks distributed LEFT JOIN of replicated and partitioned tables.
-     */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-8732")
-    @Test
-    public void testLeftDistributedJoinReplicatedPartitioned() {
-        checkLeftDistributedJoinReplicatedWith(DEP_PART_TAB);
-    }
-
-    /**
      * Checks distributed LEFT JOIN of specified and replicated tables.
      *
      * @param depTab department table name.
@@ -228,15 +219,6 @@ public class ReplicatedSqlTest extends BaseSqlTest {
     }
 
     /**
-     * Checks distributed RIGHT JOIN of partitioned and replicated tables.
-     */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-8732")
-    @Test
-    public void testRightDistributedJoinPartitionedReplicated() {
-        checkRightDistributedJoinWithReplicated(DEP_PART_TAB);
-    }
-
-    /**
      * Checks distributed RIGHT JOIN of replicated and partitioned tables.
      */
     @Test
@@ -327,15 +309,6 @@ public class ReplicatedSqlTest extends BaseSqlTest {
     }
 
     /**
-     * Check LEFT JOIN with collocated data of replicated and partitioned tables.
-     */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-8732")
-    @Test
-    public void testLeftJoinReplicatedPartitioned() {
-        checkLeftJoinEmployeeDepartment(DEP_PART_TAB);
-    }
-
-    /**
      * Check LEFT JOIN with collocated data of partitioned and replicated tables.
      */
     @Test
@@ -350,13 +323,4 @@ public class ReplicatedSqlTest extends BaseSqlTest {
     public void testRightJoinReplicatedPartitioned() {
         checkRightJoinEmployeeDepartment(DEP_PART_TAB);
     }
-
-    /**
-     * Check RIGHT JOIN with collocated data of partitioned and replicated tables.
-     */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-8732")
-    @Test
-    public void testRightJoinPartitionedReplicated() {
-        checkRightJoinDepartmentEmployee(DEP_PART_TAB);
-    }
 }
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index cd5ffe5..8a17525 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -279,6 +279,7 @@ import org.apache.ignite.internal.sql.SqlParserTransactionalKeywordsSelfTest;
 import org.apache.ignite.internal.sql.SqlParserUserSelfTest;
 import org.apache.ignite.spi.communication.tcp.GridOrderedMessageCancelSelfTest;
 import org.apache.ignite.sqltests.PartitionedSqlTest;
+import org.apache.ignite.sqltests.ReplicatedSqlCustomPartitionsTest;
 import org.apache.ignite.sqltests.ReplicatedSqlTest;
 import org.apache.ignite.util.KillCommandsMXBeanTest;
 import org.apache.ignite.util.KillCommandsSQLTest;
@@ -299,6 +300,7 @@ import org.junit.runners.Suite;
 
     PartitionedSqlTest.class,
     ReplicatedSqlTest.class,
+    ReplicatedSqlCustomPartitionsTest.class,
 
     SqlParserCreateIndexSelfTest.class,
     SqlParserDropIndexSelfTest.class,