You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2019/02/01 08:05:33 UTC

[ignite] branch master updated: IGNITE-10487: SQL: Tests for DML with partition pruning. This closes #5837.

This is an automated email from the ASF dual-hosted git repository.

vozerov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 751ab2a  IGNITE-10487: SQL: Tests for DML with partition pruning. This closes #5837.
751ab2a is described below

commit 751ab2a488eeb5af32280125673907adb3ac9262
Author: tledkov-gridgain <tl...@gridgain.com>
AuthorDate: Fri Feb 1 11:05:24 2019 +0300

    IGNITE-10487: SQL: Tests for DML with partition pruning. This closes #5837.
---
 .../twostep/AbstractPartitionPruningBaseTest.java  | 565 +++++++++++++++++++++
 .../twostep/DmlSelectPartitionPruningSelfTest.java | 235 +++++++++
 .../h2/twostep/JoinPartitionPruningSelfTest.java   | 485 ++----------------
 .../IgniteBinaryCacheQueryTestSuite.java           |   2 +
 4 files changed, 831 insertions(+), 456 deletions(-)

diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractPartitionPruningBaseTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractPartitionPruningBaseTest.java
new file mode 100644
index 0000000..493b264
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/AbstractPartitionPruningBaseTest.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxQueryEnlistRequest;
+import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Base class for partition pruning tests.
+ */
+@SuppressWarnings("deprecation")
+@RunWith(JUnit4.class)
+public abstract class AbstractPartitionPruningBaseTest extends GridCommonAbstractTest {
+    /** Number of intercepted requests. */
+    private static final AtomicInteger INTERCEPTED_REQS = new AtomicInteger();
+
+    /** Partitions tracked during query execution. */
+    private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS = new ConcurrentSkipListSet<>();
+
+    /** Partitions tracked during query execution. */
+    private static final ConcurrentSkipListSet<ClusterNode> INTERCEPTED_NODES = new ConcurrentSkipListSet<>();
+
+    /** IP finder. */
+    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder().setShared(true);
+
+    /** Memory. */
+    protected static final String REGION_MEM = "mem";
+
+    /** Disk. */
+    protected static final String REGION_DISK = "disk";
+
+    /** Client node name. */
+    private static final String CLI_NAME = "cli";
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        cleanPersistenceDir();
+
+        startGrid(getConfiguration("srv1"));
+        startGrid(getConfiguration("srv2"));
+        startGrid(getConfiguration("srv3"));
+
+        startGrid(getConfiguration(CLI_NAME).setClientMode(true));
+
+        client().cluster().active(true);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        Ignite cli = client();
+
+        cli.destroyCaches(cli.cacheNames());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTestsStopped();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
+        return super.getConfiguration(name)
+            .setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER))
+            .setCommunicationSpi(new TrackingTcpCommunicationSpi())
+            .setLocalHost("127.0.0.1")
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDataRegionConfigurations(new DataRegionConfiguration()
+                    .setName(REGION_DISK)
+                    .setPersistenceEnabled(true))
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setName(REGION_MEM)
+                    .setPersistenceEnabled(false)));
+    }
+
+    /**
+     * Create PARTITIONED table.
+     *
+     * @param name Name.
+     * @param cols Columns.
+     */
+    protected void createPartitionedTable(String name, Object... cols) {
+        createPartitionedTable(false, name, cols);
+    }
+
+    /**
+     * Create PARTITIONED table.
+     *
+     * @param mvcc MVCC flag.
+     * @param name Name.
+     * @param cols Columns.
+     */
+    protected void createPartitionedTable(boolean mvcc, String name, Object... cols) {
+        createTable0(name, false, mvcc, cols);
+    }
+
+    /**
+     * Create REPLICATED table.
+     *
+     * @param name Name.
+     * @param cols Columns.
+     */
+    protected void createReplicatedTable(String name, Object... cols) {
+        createReplicatedTable(false, name, cols);
+    }
+
+    /**
+     * Create REPLICATED table.
+     *
+     * @param mvcc MVCC flag.
+     * @param name Name.
+     * @param cols Columns.
+     */
+    protected void createReplicatedTable(boolean mvcc, String name, Object... cols) {
+        createTable0(name, true, mvcc, cols);
+    }
+
+    /**
+     * Internal CREATE TABLE routine.
+     *
+     * @param name Name.
+     * @param replicated Replicated table flag.
+     * @param mvcc MVCC flag.
+     * @param cols Columns.
+     */
+    @SuppressWarnings("StringConcatenationInsideStringBufferAppend")
+    private void createTable0(String name, boolean replicated, boolean mvcc, Object... cols) {
+        List<String> pkCols = new ArrayList<>();
+
+        String affCol = null;
+
+        StringBuilder sql = new StringBuilder("CREATE TABLE ").append(name).append("(");
+        for (Object col : cols) {
+            Column col0 = col instanceof Column ? (Column)col : new Column((String)col, false, false);
+
+            sql.append(col0.name()).append(" VARCHAR, ");
+
+            if (col0.pk())
+                pkCols.add(col0.name());
+
+            if (col0.affinity()) {
+                if (affCol != null)
+                    throw new IllegalStateException("Only one affinity column is allowed: " + col0.name());
+
+                affCol = col0.name();
+            }
+        }
+
+        if (pkCols.isEmpty())
+            throw new IllegalStateException("No PKs!");
+
+        sql.append("PRIMARY KEY (");
+
+        boolean firstPkCol = true;
+
+        for (String pkCol : pkCols) {
+            if (firstPkCol)
+                firstPkCol = false;
+            else
+                sql.append(", ");
+
+            sql.append(pkCol);
+        }
+
+        sql.append(")");
+
+        sql.append(") WITH \"template=" + (replicated ? "replicated" : "partitioned"));
+        sql.append(", CACHE_NAME=" + name);
+
+        if (affCol != null) {
+            sql.append(", AFFINITY_KEY=" + affCol);
+            sql.append(", KEY_TYPE=" + name + "_key");
+        }
+
+        if (mvcc)
+            sql.append(", atomicity=TRANSACTIONAL_SNAPSHOT");
+
+        sql.append("\"");
+
+        executeSql(sql.toString());
+    }
+
+    /**
+     * Execute query with all possible combinations of argument placeholders.
+     *
+     * @param sql SQL.
+     * @param resConsumer Result consumer.
+     * @param args Arguments.
+     */
+    public void execute(String sql, Consumer<List<List<?>>> resConsumer, Object... args) {
+        System.out.println(">>> TEST COMBINATION: " + sql);
+
+        // Execute query as is.
+        List<List<?>> res = executeSingle(sql, args);
+
+        resConsumer.accept(res);
+
+        // Start filling arguments recursively.
+        if (args != null && args.length > 0)
+            executeCombinations0(sql, resConsumer, new HashSet<>(), args);
+
+        System.out.println();
+    }
+
+    /**
+     * Execute query with all possible combinations of argument placeholders.
+     *
+     * @param sql SQL.
+     * @param resConsumer Result consumer.
+     * @param executedSqls Already executed SQLs.
+     * @param args Arguments.
+     */
+    private void executeCombinations0(
+        String sql,
+        Consumer<List<List<?>>> resConsumer,
+        Set<String> executedSqls,
+        Object... args
+    ) {
+        assert args != null && args.length > 0;
+
+        // Get argument positions.
+        List<Integer> paramPoss = new ArrayList<>();
+
+        int pos = 0;
+
+        while (true) {
+            int paramPos = sql.indexOf('?', pos);
+
+            if (paramPos == -1)
+                break;
+
+            paramPoss.add(paramPos);
+
+            pos = paramPos + 1;
+        }
+
+        for (int i = 0; i < args.length; i++) {
+            // Prepare new SQL and arguments.
+            int paramPos = paramPoss.get(i);
+
+            String newSql = sql.substring(0, paramPos) + args[i] + sql.substring(paramPos + 1);
+
+            Object[] newArgs = new Object[args.length - 1];
+
+            int newArgsPos = 0;
+
+            for (int j = 0; j < args.length; j++) {
+                if (j != i)
+                    newArgs[newArgsPos++] = args[j];
+            }
+
+            // Execute if this combination was never executed before.
+            if (executedSqls.add(newSql)) {
+                List<List<?>> res = executeSingle(newSql, newArgs);
+
+                resConsumer.accept(res);
+            }
+
+            // Continue recursively.
+            if (newArgs.length > 0)
+                executeCombinations0(newSql, resConsumer, executedSqls, newArgs);
+        }
+    }
+
+    /**
+     * Execute SQL query.
+     *
+     * @param sql SQL.
+     * @param args Parameters arguments.
+     * @return Query results.
+     */
+    protected List<List<?>> executeSingle(String sql, Object... args) {
+        clearIoState();
+
+        return executeSql(sql, args);
+    }
+
+    /**
+     * Execute SQL query.
+     *
+     * @param sql SQL.
+     * @param args Parameters arguments.
+     * @return Query results.
+     */
+    protected List<List<?>> executeSql(String sql, Object... args) {
+        if (args == null || args.length == 0)
+            System.out.println(">>> " + sql);
+        else
+            System.out.println(">>> " + sql + " " + Arrays.toString(args));
+
+        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
+
+        if (args != null && args.length > 0)
+            qry.setArgs(args);
+
+        return executeSqlFieldsQuery(qry);
+    }
+
+    /**
+     * Execute prepared SQL fields query.
+     *
+     * @param qry Query.
+     * @return Result.
+     */
+    protected List<List<?>> executeSqlFieldsQuery(SqlFieldsQuery qry) {
+        return client().context().query().querySqlFields(qry, false).getAll();
+    }
+
+    /**
+     * @return Client node.
+     */
+    protected IgniteEx client() {
+        return grid(CLI_NAME);
+    }
+
+    /**
+     * Clear partitions.
+     */
+    protected static void clearIoState() {
+        INTERCEPTED_REQS.set(0);
+        INTERCEPTED_PARTS.clear();
+        INTERCEPTED_NODES.clear();
+    }
+
+    /**
+     * Make sure that expected partitions are logged.
+     *
+     * @param expParts Expected partitions.
+     */
+    protected static void assertPartitions(int... expParts) {
+        Collection<Integer> expParts0 = new TreeSet<>();
+
+        for (int expPart : expParts)
+            expParts0.add(expPart);
+
+        assertPartitions(expParts0);
+    }
+
+    /**
+     * Make sure that expected partitions are logged.
+     *
+     * @param expParts Expected partitions.
+     */
+    protected static void assertPartitions(Collection<Integer> expParts) {
+        TreeSet<Integer> expParts0 = new TreeSet<>(expParts);
+        TreeSet<Integer> actualParts = new TreeSet<>(INTERCEPTED_PARTS);
+
+        assertEquals("Unexpected partitions [exp=" + expParts + ", actual=" + actualParts + ']',
+            expParts0, actualParts);
+    }
+
+    /**
+     * Make sure that no partitions were extracted.
+     */
+    protected static void assertNoPartitions() {
+        assertTrue("No requests were sent.", INTERCEPTED_REQS.get() > 0);
+        assertTrue("Partitions are not empty: " + INTERCEPTED_PARTS, INTERCEPTED_PARTS.isEmpty());
+    }
+
+    /**
+     * Make sure there were no requests sent because we determined empty partition set.
+     */
+    protected static void assertNoRequests() {
+        assertEquals("Requests were sent: " + INTERCEPTED_REQS.get(), 0, INTERCEPTED_REQS.get());
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @return Partition.
+     */
+    protected int partition(String cacheName, Object key) {
+        return client().affinity(cacheName).partition(key);
+    }
+
+    /**
+     * Make sure that expected nodes are logged.
+     *
+     * @param expNodes Expected nodes.
+     */
+    protected static void assertNodes(ClusterNode... expNodes) {
+        Collection<ClusterNode> expNodes0 = new TreeSet<>();
+
+        for (ClusterNode expNode : expNodes)
+            expNodes0.add(expNode);
+
+        assertNodes(expNodes0);
+    }
+
+    /**
+     * Make sure that expected nodes are logged.
+     *
+     * @param expNodes Expected nodes.
+     */
+    protected static void assertNodes(Collection<ClusterNode> expNodes) {
+        TreeSet<ClusterNode> expNodes0 = new TreeSet<>(expNodes);
+        TreeSet<ClusterNode> actualNodes = new TreeSet<>(INTERCEPTED_NODES);
+
+        assertEquals("Unexpected nodes [exp=" + expNodes + ", actual=" + actualNodes + ']',
+            expNodes0, actualNodes);
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param key Key.
+     * @return Node.
+     */
+    protected ClusterNode node(String cacheName, Object key) {
+        return client().affinity(cacheName).mapKeyToNode(key);
+    }
+
+    /**
+     * TCP communication SPI which will track outgoing query requests.
+     */
+    private static class TrackingTcpCommunicationSpi extends TcpCommunicationSpi {
+        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
+            if (msg instanceof GridIoMessage) {
+                GridIoMessage msg0 = (GridIoMessage)msg;
+
+                if (msg0.message() instanceof GridH2QueryRequest) {
+                    INTERCEPTED_NODES.add(node);
+                    INTERCEPTED_REQS.incrementAndGet();
+
+                    GridH2QueryRequest req = (GridH2QueryRequest)msg0.message();
+
+                    int[] parts = req.queryPartitions();
+
+                    if (!F.isEmpty(parts)) {
+                        for (int part : parts)
+                            INTERCEPTED_PARTS.add(part);
+                    }
+                }
+                else if(msg0.message() instanceof GridNearTxQueryEnlistRequest) {
+                    INTERCEPTED_NODES.add(node);
+                    INTERCEPTED_REQS.incrementAndGet();
+
+                    GridNearTxQueryEnlistRequest req = (GridNearTxQueryEnlistRequest)msg0.message();
+
+                    int[] parts = req.partitions();
+
+                    if (!F.isEmpty(parts)) {
+                        for (int part : parts)
+                            INTERCEPTED_PARTS.add(part);
+                    }
+                }
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+
+    /**
+     * @param name Name.
+     * @return PK column.
+     */
+    public Column pkColumn(String name) {
+        return new Column(name, true, false);
+    }
+
+    /**
+     * @param name Name.
+     * @return Affintiy column.
+     */
+    public Column affinityColumn(String name) {
+        return new Column(name, true, true);
+    }
+
+    /**
+     * Column.
+     */
+    private static class Column {
+        /** Name. */
+        private final String name;
+
+        /** PK. */
+        private final boolean pk;
+
+        /** Affinity key. */
+        private final boolean aff;
+
+        /**
+         * Constructor.
+         *
+         * @param name Name.
+         * @param pk PK flag.
+         * @param aff Affinity flag.
+         */
+        public Column(String name, boolean pk, boolean aff) {
+            this.name = name;
+            this.pk = pk;
+            this.aff = aff;
+        }
+
+        /**
+         * @return Name.
+         */
+        public String name() {
+            return name;
+        }
+
+        /**
+         * @return PK flag.
+         */
+        public boolean pk() {
+            return pk;
+        }
+
+        /**
+         * @return Affintiy flag.
+         */
+        public boolean affinity() {
+            return aff;
+        }
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DmlSelectPartitionPruningSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DmlSelectPartitionPruningSelfTest.java
new file mode 100644
index 0000000..927cc3d
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/DmlSelectPartitionPruningSelfTest.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query.h2.twostep;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.binary.BinaryObject;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.List;
+
+/**
+ * Tests for use partition pruning at the SELECT step of the UPDATE/DELETE statements execution.
+ */
+@SuppressWarnings("deprecation")
+@RunWith(JUnit4.class)
+public class DmlSelectPartitionPruningSelfTest extends AbstractPartitionPruningBaseTest {
+    /** Rows count for test tables. */
+    private static final int ROWS = 10;
+
+    /** Recreate tables before each test statement. */
+    private boolean recreateTables;
+
+    /**
+     * Test UPDATE statement.
+     */
+    @Test
+    public void testUpdate() {
+        recreateTables = false;
+
+        recreateTables();
+
+        // Key (not alias).
+        execute("UPDATE t1 SET v1 = 'new1' WHERE k1 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "1")
+                );
+                assertUpdatedRows(res, 1);
+            },
+            "1"
+        );
+
+        // Key (alias).
+        execute("UPDATE t1 SET v1 = 'new2' WHERE _KEY = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "2")
+                );
+                assertUpdatedRows(res, 1);
+            },
+            "2"
+        );
+
+        // Non-affinity key.
+        execute("UPDATE t2 SET v2 = 'new1' WHERE k2 = ?",
+            (res) -> {
+                assertNoPartitions();
+                assertUpdatedRows(res, 1);
+            },
+            "1"
+        );
+
+        // Affinity key.
+        execute("UPDATE t2 SET v2 = 'new1' WHERE ak2 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t2", "1")
+                );
+                assertUpdatedRows(res, 1);
+            },
+            "1"
+        );
+
+        // Expression: condition IN (...)
+        execute("UPDATE t1 SET v1 = 'new1' WHERE k1 in (?, ?, ?)",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "1"),
+                    partition("t1", "2"),
+                    partition("t1", "3")
+                );
+                assertUpdatedRows(res, 3);
+            },
+            "1", "2", "3"
+        );
+
+        // Expression: logical
+        execute("UPDATE t1 SET v1 = 'new1' WHERE k1 in (?, ?) or k1 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "1"),
+                    partition("t1", "2"),
+                    partition("t1", "3")
+                );
+                assertUpdatedRows(res, 3);
+            },
+            "3", "2", "1"
+        );
+
+        // No request (empty partitions).
+        execute("UPDATE t1 SET v1 = 'new1' WHERE k1 in (?, ?) and k1 = ?",
+            (res) -> {
+                assertNoRequests();
+                assertUpdatedRows(res, 0);
+            },
+            "3", "2", "1"
+        );
+
+        // Complex key.
+        BinaryObject key = client().binary().builder("t2_key")
+            .setField("k1", "5")
+            .setField("ak2", "5")
+            .build();
+
+        List<List<?>> res = executeSingle("UPDATE t2 SET v2 = 'new1' WHERE _KEY = ?", key);
+        assertPartitions(
+            partition("t2", "5")
+        );
+        assertUpdatedRows(res, 1);
+    }
+
+    /**
+     * Test UPDATE statement.
+     */
+    @Test
+    public void testDelete() {
+        recreateTables = true;
+
+        // Affinity key.
+        execute("DELETE FROM t2 WHERE ak2 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t2", "2")
+                );
+                assertUpdatedRows(res, 1);
+            },
+            "2"
+        );
+
+        // Expression: condition IN (...)
+        execute("DELETE FROM t1 WHERE k1 in (?, ?, ?)",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "1"),
+                    partition("t1", "2"),
+                    partition("t1", "3")
+                );
+                assertUpdatedRows(res, 3);
+            },
+            "1", "2", "3"
+        );
+
+        // Expression: logical OR
+        execute("DELETE FROM t1 WHERE k1 in (?, ?) or k1 = ?",
+            (res) -> {
+                assertPartitions(
+                    partition("t1", "1"),
+                    partition("t1", "2"),
+                    partition("t1", "3")
+                );
+                assertUpdatedRows(res, 3);
+            },
+            "3", "2", "1"
+        );
+
+        // No request (empty partitions).
+        execute("DELETE FROM t1  WHERE k1 in (?, ?) and k1 = ?",
+            (res) -> {
+                assertNoRequests();
+                assertUpdatedRows(res, 0);
+            },
+            "3", "2", "1"
+        );
+    }
+
+    /**
+     * Drop, create and fill test tables.
+     */
+    private void recreateTables() {
+        Ignite cli = client();
+
+        cli.destroyCaches(cli.cacheNames());
+
+        createPartitionedTable("t1",
+            pkColumn("k1"),
+            "v1");
+
+        createPartitionedTable("t2",
+            pkColumn("k2"),
+            affinityColumn("ak2"),
+            "v2");
+
+        for (int i = 0; i < ROWS; ++i) {
+            executeSql("INSERT INTO t1 VALUES (?, ?)",
+                Integer.toString(i), Integer.toString(i));
+
+            executeSql("INSERT INTO t2 VALUES (?, ?, ?)",
+                Integer.toString(i), Integer.toString(i), Integer.toString(i));
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<List<?>> executeSingle(String sql, Object... args) {
+        if (recreateTables)
+            recreateTables();
+
+        return super.executeSingle(sql, args);
+    }
+
+    /**
+     * @param res Updated results.
+     * @param expUpdated Expected updated rows count.
+     */
+    private static void assertUpdatedRows(List<List<?>> res, long expUpdated) {
+        assertEquals(1, res.size());
+        assertEquals(expUpdated, res.get(0).get(0));
+    }
+}
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java
index dd03a77..e9e4d13 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/h2/twostep/JoinPartitionPruningSelfTest.java
@@ -17,8 +17,9 @@
 
 package org.apache.ignite.internal.processors.query.h2.twostep;
 
+import java.util.Collections;
+import java.util.List;
 import org.apache.ignite.Ignite;
-import org.apache.ignite.IgniteException;
 import org.apache.ignite.binary.BinaryObject;
 import org.apache.ignite.cache.CacheKeyConfiguration;
 import org.apache.ignite.cache.CacheMode;
@@ -28,106 +29,23 @@ import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
-import org.apache.ignite.configuration.DataRegionConfiguration;
-import org.apache.ignite.configuration.DataStorageConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.processors.query.QueryUtils;
-import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2QueryRequest;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
-import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
-import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.TreeSet;
-import java.util.concurrent.ConcurrentSkipListSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
 
 /**
  * Tests for join partition pruning.
  */
 @SuppressWarnings("deprecation")
-public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest {
-    /** Number of intercepted requests. */
-    private static final AtomicInteger INTERCEPTED_REQS = new AtomicInteger();
-
-    /** Parititions tracked during query execution. */
-    private static final ConcurrentSkipListSet<Integer> INTERCEPTED_PARTS = new ConcurrentSkipListSet<>();
-
-    /** IP finder. */
-    private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder().setShared(true);
-
-    /** Client node name. */
-    private static final String CLI_NAME = "cli";
-
-    /** Memory. */
-    private static final String REGION_MEM = "mem";
-
-    /** Disk. */
-    private static final String REGION_DISK = "disk";
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        cleanPersistenceDir();
-
-        startGrid(getConfiguration("srv1"));
-        startGrid(getConfiguration("srv2"));
-        startGrid(getConfiguration("srv3"));
-
-        startGrid(getConfiguration(CLI_NAME).setClientMode(true));
-
-        client().cluster().active(true);
-    }
-
+@RunWith(JUnit4.class)
+public class JoinPartitionPruningSelfTest extends AbstractPartitionPruningBaseTest {
     /** {@inheritDoc} */
     @Override protected void beforeTest() throws Exception {
-        clearIoState();
-
-        Ignite cli = client();
-
-        cli.destroyCaches(cli.cacheNames());
-    }
+        super.beforeTest();
 
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-
-        cleanPersistenceDir();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String name) throws Exception {
-        IgniteConfiguration res = super.getConfiguration(name);
-
-        res.setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(IP_FINDER));
-        res.setCommunicationSpi(new TrackingTcpCommunicationSpi());
-
-        res.setLocalHost("127.0.0.1");
-
-        DataRegionConfiguration memRegion =
-            new DataRegionConfiguration().setName(REGION_MEM).setPersistenceEnabled(false);
-
-        DataRegionConfiguration diskRegion =
-            new DataRegionConfiguration().setName(REGION_DISK).setPersistenceEnabled(true);
-
-        res.setDataStorageConfiguration(new DataStorageConfiguration().setDataRegionConfigurations(diskRegion)
-            .setDefaultDataRegionConfiguration(memRegion));
-
-        return res;
+        clearIoState();
     }
 
     /**
@@ -144,20 +62,20 @@ public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest {
             affinityColumn("ak2"),
             "v3");
 
-        executeSingle("INSERT INTO t1 VALUES ('1', '1')");
-        executeSingle("INSERT INTO t2 VALUES ('1', '1', '1')");
+        executeSql("INSERT INTO t1 VALUES ('1', '1')");
+        executeSql("INSERT INTO t2 VALUES ('1', '1', '1')");
 
-        executeSingle("INSERT INTO t1 VALUES ('2', '2')");
+        executeSql("INSERT INTO t1 VALUES ('2', '2')");
         executeSingle("INSERT INTO t2 VALUES ('2', '2', '2')");
 
-        executeSingle("INSERT INTO t1 VALUES ('3', '3')");
-        executeSingle("INSERT INTO t2 VALUES ('3', '3', '3')");
+        executeSql("INSERT INTO t1 VALUES ('3', '3')");
+        executeSql("INSERT INTO t2 VALUES ('3', '3', '3')");
 
-        executeSingle("INSERT INTO t1 VALUES ('4', '4')");
-        executeSingle("INSERT INTO t2 VALUES ('4', '4', '4')");
+        executeSql("INSERT INTO t1 VALUES ('4', '4')");
+        executeSql("INSERT INTO t2 VALUES ('4', '4', '4')");
 
-        executeSingle("INSERT INTO t1 VALUES ('5', '5')");
-        executeSingle("INSERT INTO t2 VALUES ('5', '5', '5')");
+        executeSql("INSERT INTO t1 VALUES ('5', '5')");
+        executeSql("INSERT INTO t2 VALUES ('5', '5', '5')");
 
         // Key (not alias).
         execute("SELECT * FROM t1 INNER JOIN t2 ON t1.k1 = t2.ak2 WHERE t1.k1 = ?",
@@ -374,14 +292,14 @@ public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest {
             affinityColumn("ak2"),
             "v3");
 
-        executeSingle("INSERT INTO t1 VALUES ('1', '1')");
-        executeSingle("INSERT INTO t2 VALUES ('1', '1', '1')");
+        executeSql("INSERT INTO t1 VALUES ('1', '1')");
+        executeSql("INSERT INTO t2 VALUES ('1', '1', '1')");
 
-        executeSingle("INSERT INTO t1 VALUES ('2', '2')");
-        executeSingle("INSERT INTO t2 VALUES ('2', '2', '2')");
+        executeSql("INSERT INTO t1 VALUES ('2', '2')");
+        executeSql("INSERT INTO t2 VALUES ('2', '2', '2')");
 
-        executeSingle("INSERT INTO t1 VALUES ('3', '3')");
-        executeSingle("INSERT INTO t2 VALUES ('3', '3', '3')");
+        executeSql("INSERT INTO t1 VALUES ('3', '3')");
+        executeSql("INSERT INTO t2 VALUES ('3', '3', '3')");
 
         // Left table, should work.
         execute("SELECT * FROM t1, t2 WHERE t1.k1 = ?",
@@ -670,6 +588,11 @@ public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest {
         );
     }
 
+    /**
+     * @param ccfg1 Cache config 1.
+     * @param ccfg2 Cache config 2.
+     * @param compatible Compatible affinity function flag (false when affinity is incompatible).
+     */
     @SuppressWarnings("unchecked")
     private void checkAffinityFunctions(CacheConfiguration ccfg1, CacheConfiguration ccfg2, boolean compatible) {
         // Destroy old caches.
@@ -738,6 +661,7 @@ public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest {
      * @param persistent Whether to enable persistence.
      * @return Cache configuration.
      */
+    @SuppressWarnings("IfMayBeConditional")
     private static CacheConfiguration cacheConfiguration(
         int parts,
         int backups,
@@ -898,357 +822,6 @@ public class JoinPartitionPruningSelfTest extends GridCommonAbstractTest {
     }
 
     /**
-     * Create PARTITIONED table.
-     *
-     * @param name Name.
-     * @param cols Columns.
-     */
-    private void createPartitionedTable(String name, Object... cols) {
-        createTable0(name, false, cols);
-    }
-
-    /**
-     * Create REPLICATED table.
-     *
-     * @param name Name.
-     * @param cols Columns.
-     */
-    @SuppressWarnings("SameParameterValue")
-    private void createReplicatedTable(String name, Object... cols) {
-        createTable0(name, true, cols);
-    }
-
-    /**
-     * Internal CREATE TABLE routine.
-     *
-     * @param name Name.
-     * @param replicated Replicated table flag.
-     * @param cols Columns.
-     */
-    @SuppressWarnings("StringConcatenationInsideStringBufferAppend")
-    private void createTable0(String name, boolean replicated, Object... cols) {
-        List<String> pkCols = new ArrayList<>();
-
-        String affCol = null;
-
-        StringBuilder sql = new StringBuilder("CREATE TABLE ").append(name).append("(");
-        for (Object col : cols) {
-            Column col0 = col instanceof Column ? (Column)col : new Column((String)col, false, false);
-
-            sql.append(col0.name()).append(" VARCHAR, ");
-
-            if (col0.pk())
-                pkCols.add(col0.name());
-
-            if (col0.affinity()) {
-                if (affCol != null)
-                    throw new IllegalStateException("Only one affinity column is allowed: " + col0.name());
-
-                affCol = col0.name();
-            }
-        }
-
-        if (pkCols.isEmpty())
-            throw new IllegalStateException("No PKs!");
-
-        sql.append("PRIMARY KEY (");
-
-        boolean firstPkCol = true;
-
-        for (String pkCol : pkCols) {
-            if (firstPkCol)
-                firstPkCol = false;
-            else
-                sql.append(", ");
-
-            sql.append(pkCol);
-        }
-
-        sql.append(")");
-
-        sql.append(") WITH \"template=" + (replicated ? "replicated" : "partitioned"));
-        sql.append(", CACHE_NAME=" + name);
-
-        if (affCol != null) {
-            sql.append(", AFFINITY_KEY=" + affCol);
-            sql.append(", KEY_TYPE=" + name + "_key");
-        }
-
-        sql.append("\"");
-
-        executeSingle(sql.toString());
-    }
-
-    /**
-     * Execute query with all possible combinations of argument placeholders.
-     *
-     * @param sql SQL.
-     * @param resConsumer Result consumer.
-     * @param args Arguments.
-     */
-    public void execute(String sql, Consumer<List<List<?>>> resConsumer, Object... args) {
-        System.out.println(">>> TEST COMBINATION: " + sql);
-
-        // Execute query as is.
-        List<List<?>> res = executeSingle(sql, args);
-
-        resConsumer.accept(res);
-
-        // Start filling arguments recursively.
-        if (args != null && args.length > 0)
-            executeCombinations0(sql, resConsumer, new HashSet<>(), args);
-
-        System.out.println();
-    }
-
-    /**
-     * Execute query with all possible combinations of argument placeholders.
-     *
-     * @param sql SQL.
-     * @param resConsumer Result consumer.
-     * @param executedSqls Already executed SQLs.
-     * @param args Arguments.
-     */
-    public void executeCombinations0(
-        String sql,
-        Consumer<List<List<?>>> resConsumer,
-        Set<String> executedSqls,
-        Object... args
-    ) {
-        assert args != null && args.length > 0;
-
-        // Get argument positions.
-        List<Integer> paramPoss = new ArrayList<>();
-
-        int pos = 0;
-
-        while (true) {
-            int paramPos = sql.indexOf('?', pos);
-
-            if (paramPos == -1)
-                break;
-
-            paramPoss.add(paramPos);
-
-            pos = paramPos + 1;
-        }
-
-        for (int i = 0; i < args.length; i++) {
-            // Prepare new SQL and arguments.
-            int paramPos = paramPoss.get(i);
-
-            String newSql = sql.substring(0, paramPos) + args[i] + sql.substring(paramPos + 1);
-
-            Object[] newArgs = new Object[args.length - 1];
-
-            int newArgsPos = 0;
-
-            for (int j = 0; j < args.length; j++) {
-                if (j != i)
-                    newArgs[newArgsPos++] = args[j];
-            }
-
-            // Execute if this combination was never executed before.
-            if (executedSqls.add(newSql)) {
-                List<List<?>> res = executeSingle(newSql, newArgs);
-
-                resConsumer.accept(res);
-            }
-
-            // Continue recursively.
-            if (newArgs.length > 0)
-                executeCombinations0(newSql, resConsumer, executedSqls, newArgs);
-        }
-    }
-
-    /**
-     * Execute SQL query.
-     *
-     * @param sql SQL.
-     */
-    private List<List<?>> executeSingle(String sql, Object... args) {
-        clearIoState();
-
-        if (args == null || args.length == 0)
-            System.out.println(">>> " + sql);
-        else
-            System.out.println(">>> " + sql + " " + Arrays.toString(args));
-
-        SqlFieldsQuery qry = new SqlFieldsQuery(sql);
-
-        if (args != null && args.length > 0)
-            qry.setArgs(args);
-
-        return executeSqlFieldsQuery(qry);
-    }
-
-    /**
-     * Execute prepared SQL fields query.
-     *
-     * @param qry Query.
-     * @return Result.
-     */
-    private List<List<?>> executeSqlFieldsQuery(SqlFieldsQuery qry) {
-        return client().context().query().querySqlFields(qry, false).getAll();
-    }
-
-    /**
-     * @return Client node.
-     */
-    private IgniteEx client() {
-        return grid(CLI_NAME);
-    }
-
-    /**
-     * Clear partitions.
-     */
-    private static void clearIoState() {
-        INTERCEPTED_REQS.set(0);
-        INTERCEPTED_PARTS.clear();
-    }
-
-    /**
-     * Make sure that expected partitions are logged.
-     *
-     * @param expParts Expected partitions.
-     */
-    private static void assertPartitions(int... expParts) {
-        Collection<Integer> expParts0 = new TreeSet<>();
-
-        for (int expPart : expParts)
-            expParts0.add(expPart);
-
-        assertPartitions(expParts0);
-    }
-
-    /**
-     * Make sure that expected partitions are logged.
-     *
-     * @param expParts Expected partitions.
-     */
-    private static void assertPartitions(Collection<Integer> expParts) {
-        TreeSet<Integer> expParts0 = new TreeSet<>(expParts);
-        TreeSet<Integer> actualParts = new TreeSet<>(INTERCEPTED_PARTS);
-
-        assertEquals("Unexpected partitions [exp=" + expParts + ", actual=" + actualParts + ']',
-            expParts0, actualParts);
-    }
-
-    /**
-     * Make sure that no partitions were extracted.
-     */
-    private static void assertNoPartitions() {
-        assertTrue("No requests were sent.", INTERCEPTED_REQS.get() > 0);
-        assertTrue("Partitions are not empty: " + INTERCEPTED_PARTS, INTERCEPTED_PARTS.isEmpty());
-    }
-
-    /**
-     * Make sure there were no requests sent because we determined empty partition set.
-     */
-    private static void assertNoRequests() {
-        assertEquals("Requests were sent: " + INTERCEPTED_REQS.get(), 0, INTERCEPTED_REQS.get());
-    }
-
-    /**
-     * @param cacheName Cache name.
-     * @param key Key.
-     * @return Partition.
-     */
-    private int partition(String cacheName, Object key) {
-        return client().affinity(cacheName).partition(key);
-    }
-
-    /**
-     * TCP communication SPI which will track outgoing query requests.
-     */
-    private static class TrackingTcpCommunicationSpi extends TcpCommunicationSpi {
-        @Override public void sendMessage(ClusterNode node, Message msg, IgniteInClosure<IgniteException> ackC) {
-            if (msg instanceof GridIoMessage) {
-                GridIoMessage msg0 = (GridIoMessage)msg;
-
-                if (msg0.message() instanceof GridH2QueryRequest) {
-                    INTERCEPTED_REQS.incrementAndGet();
-
-                    GridH2QueryRequest req = (GridH2QueryRequest)msg0.message();
-
-                    int[] parts = req.queryPartitions();
-
-                    if (!F.isEmpty(parts)) {
-                        for (int part : parts)
-                            INTERCEPTED_PARTS.add(part);
-                    }
-                }
-            }
-
-            super.sendMessage(node, msg, ackC);
-        }
-    }
-
-    /**
-     * @param name Name.
-     * @return PK column.
-     */
-    public Column pkColumn(String name) {
-        return new Column(name, true, false);
-    }
-
-    /**
-     * @param name Name.
-     * @return Affintiy column.
-     */
-    public Column affinityColumn(String name) {
-        return new Column(name, true, true);
-    }
-
-    /**
-     * Column.
-     */
-    private static class Column {
-        /** Name. */
-        private final String name;
-
-        /** PK. */
-        private final boolean pk;
-
-        /** Affinity key. */
-        private final boolean aff;
-
-        /**
-         * Constructor.
-         *
-         * @param name Name.
-         * @param pk PK flag.
-         * @param aff Affinity flag.
-         */
-        public Column(String name, boolean pk, boolean aff) {
-            this.name = name;
-            this.pk = pk;
-            this.aff = aff;
-        }
-
-        /**
-         * @return Name.
-         */
-        public String name() {
-            return name;
-        }
-
-        /**
-         * @return PK flag.
-         */
-        public boolean pk() {
-            return pk;
-        }
-
-        /**
-         * @return Affintiy flag.
-         */
-        public boolean affinity() {
-            return aff;
-        }
-    }
-
-    /**
      * Custom affinity function.
      */
     private static class CustomRendezvousAffinityFunction extends RendezvousAffinityFunction {
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
index 5798ade..4482b612c 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite.java
@@ -220,6 +220,7 @@ import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryDistr
 import org.apache.ignite.internal.processors.query.h2.sql.H2CompareBigQueryTest;
 import org.apache.ignite.internal.processors.query.h2.twostep.AndOperationExtractPartitionSelfTest;
 import org.apache.ignite.internal.processors.query.h2.twostep.BetweenOperationExtractPartitionSelfTest;
+import org.apache.ignite.internal.processors.query.h2.twostep.DmlSelectPartitionPruningSelfTest;
 import org.apache.ignite.internal.processors.query.h2.twostep.InOperationExtractPartitionSelfTest;
 import org.apache.ignite.internal.processors.query.h2.twostep.JoinPartitionPruningSelfTest;
 import org.apache.ignite.internal.processors.sql.IgniteCachePartitionedAtomicColumnConstraintsTest;
@@ -539,6 +540,7 @@ import org.junit.runners.Suite;
     AndOperationExtractPartitionSelfTest.class,
     BetweenOperationExtractPartitionSelfTest.class,
     JoinPartitionPruningSelfTest.class,
+    DmlSelectPartitionPruningSelfTest.class,
 
     GridCacheDynamicLoadOnClientTest.class,
     GridCacheDynamicLoadOnClientPersistentTest.class,