You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/01/15 09:40:31 UTC

[ignite] branch sql-calcite updated: IGNITE-13915 Calcite improvements. Fix fragment ID on mapping, extend tests coverage, use both client and server for starting queries (#8629)

This is an automated email from the ASF dual-hosted git repository.

tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/sql-calcite by this push:
     new 7a09246  IGNITE-13915 Calcite improvements. Fix fragment ID on mapping, extend tests coverage, use both client and server for starting queries (#8629)
7a09246 is described below

commit 7a09246b4cdc80b568c4eec838dcd40492f39822
Author: zstan <st...@gmail.com>
AuthorDate: Fri Jan 15 12:40:09 2021 +0300

    IGNITE-13915 Calcite improvements. Fix fragment ID on mapping, extend tests coverage, use both client and server for starting queries (#8629)
---
 .../processors/query/calcite/exec/rel/Inbox.java   |   5 +
 .../processors/query/calcite/prepare/Fragment.java |  10 --
 .../query/calcite/prepare/QueryTemplate.java       |   2 +-
 .../query/calcite/CalciteQueryProcessorTest.java   | 160 +++++++++++----------
 4 files changed, 94 insertions(+), 83 deletions(-)

diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index c261b79..7bdc0ac 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -108,6 +108,9 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
      * @param comp Optional comparator for merge exchange.
      */
     public void init(ExecutionContext<Row> ctx, RelDataType rowType, Collection<UUID> srcNodeIds, @Nullable Comparator<Row> comp) {
+        assert context().fragmentId() == ctx.fragmentId() : "different fragments unsupported: previous=" + context().fragmentId() +
+            " current=" + ctx.fragmentId();
+
         // It's important to set proper context here because
         // the one, that is created on a first message
         // received doesn't have all context variables in place.
@@ -187,6 +190,8 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
     /** */
     private void doPush() {
         try {
+            checkState();
+
             push();
         }
         catch (Exception e) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 90a5a5f..9db3bf1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -67,16 +67,6 @@ public class Fragment {
         this(id, root, remotes, null, null);
     }
 
-    /**
-     * @param id Fragment id.
-     * @param root Root node of the fragment.
-     * @param remotes Remote sources of the fragment.
-     * @param rootSer Root serialized representation.
-     */
-    public Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer) {
-        this(id, root, remotes, rootSer, null);
-    }
-
     /** */
     Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer, @Nullable FragmentMapping mapping) {
         this.id = id;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
index 583385c..5689994 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
@@ -119,7 +119,7 @@ public class QueryTemplate {
                     sender = new IgniteSender(sender.getCluster(), sender.getTraitSet(),
                         sender.getInput(), sender.exchangeId(), newTargetId, sender.distribution());
 
-                    fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes(), fragment0.serialized());
+                    fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes());
                 }
             }
 
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 3f7e01a..4fc05bb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite;
 
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -29,6 +30,7 @@ import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.QueryEntity;
 import org.apache.ignite.cache.affinity.AffinityKeyMapped;
 import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
@@ -50,17 +52,17 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_E
 @WithSystemProperty(key = "calcite.debug", value = "false")
 public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     /** */
-    private static IgniteEx ignite;
+    private static IgniteEx client;
 
     /** {@inheritDoc} */
     @Override protected void beforeTestsStarted() throws Exception {
         startGrids(5);
 
-        ignite = startClientGrid();
+        client = startClientGrid();
     }
 
     /** {@inheritDoc} */
-    @Override protected void afterTest() throws Exception {
+    @Override protected void afterTest() {
         for (Ignite ign : G.allGrids()) {
             for (String cacheName : ign.cacheNames())
                 ign.destroyCache(cacheName);
@@ -75,21 +77,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     /** */
     @Test
     public void testCountWithJoin() throws Exception {
-        IgniteCache<Integer, RISK> RISK = ignite.getOrCreateCache(new CacheConfiguration<Integer, RISK>()
+        IgniteCache<Integer, RISK> RISK = client.getOrCreateCache(new CacheConfiguration<Integer, RISK>()
             .setName("RISK")
             .setSqlSchema("PUBLIC")
-            .setQueryEntities(F.asList(new QueryEntity(Integer.class, RISK.class).setTableName("RISK")))
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, RISK.class).setTableName("RISK")
+                .setKeyFields(new HashSet<>(Arrays.asList("TRADEID", "TRADEVER")))))
             .setBackups(1)
         );
 
-        IgniteCache<Integer, TRADE> TRADE = ignite.getOrCreateCache(new CacheConfiguration<Integer, TRADE>()
+        IgniteCache<Integer, TRADE> TRADE = client.getOrCreateCache(new CacheConfiguration<Integer, TRADE>()
             .setName("TRADE")
             .setSqlSchema("PUBLIC")
-            .setQueryEntities(F.asList(new QueryEntity(Integer.class, TRADE.class).setTableName("TRADE")))
+            .setQueryEntities(F.asList(new QueryEntity(Integer.class, TRADE.class).setTableName("TRADE")
+                .setKeyFields(new HashSet<>(Arrays.asList("TRADEID", "TRADEVER")))))
             .setBackups(1)
         );
 
-        IgniteCache<Integer, BATCH> BATCH = ignite.getOrCreateCache(new CacheConfiguration<Integer, BATCH>()
+        IgniteCache<Integer, BATCH> BATCH = client.getOrCreateCache(new CacheConfiguration<Integer, BATCH>()
             .setName("BATCH")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, BATCH.class).setTableName("BATCH")))
@@ -115,47 +119,51 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
         awaitPartitionMapExchange(true, true, null);
 
-        QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class);
-
         // TODO: https://issues.apache.org/jira/browse/IGNITE-13849
         // we have a problem with serialization/deserialization of MergeJoin
-        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
-            "SELECT /*+ DISABLE_RULE('MergeJoinConverter') */ count(*)" +
-                " FROM RISK R," +
-                " TRADE T," +
-                " BATCH B " +
-                "WHERE R.BATCHKEY = B.BATCHKEY " +
-                "AND R.TRADEID = T.TRADEID " +
-                "AND R.TRADEVER = T.TRADEVER " +
-                "AND T.BOOK = 'BOOK' " +
-                "AND B.IS = TRUE");
+        String sqlCalc = "SELECT /*+ DISABLE_RULE('MergeJoinConverter') */ count(*)" +
+            " FROM RISK R," +
+            " TRADE T," +
+            " BATCH B " +
+            "WHERE R.BATCHKEY = B.BATCHKEY " +
+            "AND R.TRADEID = T.TRADEID " +
+            "AND R.TRADEVER = T.TRADEVER " +
+            "AND T.BOOK = 'BOOK' " +
+            "AND B.LS = TRUE";
+
+        // loop for test execution.
+        for (int i = 0; i < 10; i++) {
+            List<List<?>> resLoc = sql(sqlCalc);
+            assertEquals(40L, resLoc.get(0).get(0));
+        }
 
-        List<List<?>> res = query.get(0).getAll();
+        //calcite
+        List<List<?>> res1 = sql(sqlCalc);
 
-        assertEquals(1, res.size());
-        assertEquals(1, res.get(0).size());
-        assertEquals(40L, res.get(0).get(0));
+        assertEquals(1, res1.size());
+        assertEquals(1, res1.get(0).size());
+        assertEquals(40L, res1.get(0).get(0));
     }
 
     /** */
     public static class RISK {
         /** */
         @QuerySqlField
-        public Integer BATCHKEY;
+        public Integer batchKey;
 
         /** */
         @QuerySqlField
-        public Integer TRADEID;
+        public Integer tradeId;
 
         /** */
         @QuerySqlField
-        public Integer TRADEVER;
+        public Integer tradeVer;
 
         /** */
         public RISK(Integer in) {
-            BATCHKEY = in;
-            TRADEID = in;
-            TRADEVER = in;
+            batchKey = in;
+            tradeId = in;
+            tradeVer = in;
         }
     }
 
@@ -163,21 +171,21 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     public static class TRADE {
         /** */
         @QuerySqlField
-        public Integer TRADEID;
+        public Integer tradeId;
 
         /** */
         @QuerySqlField
-        public Integer TRADEVER;
+        public Integer tradeVer;
 
         /** */
         @QuerySqlField
-        public String BOOK;
+        public String book;
 
         /** */
         public TRADE(Integer in) {
-            TRADEID = in;
-            TRADEVER = in;
-            BOOK = (in & 1) != 0 ? "BOOK" : "";
+            tradeId = in;
+            tradeVer = in;
+            book = (in & 1) != 0 ? "BOOK" : "";
         }
     }
 
@@ -185,37 +193,37 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     public static class BATCH {
         /** */
         @QuerySqlField
-        public Integer BATCHKEY;
+        public Integer batchKey;
 
         /** */
         @QuerySqlField
-        public Boolean IS;
+        public Boolean ls;
 
         /** */
         public BATCH(Integer in) {
-            BATCHKEY = in;
-            IS = (in & 1) != 0;
+            batchKey = in;
+            ls = (in & 1) != 0;
         }
     }
 
     /** */
     @Test
     public void unionAll() throws Exception {
-        IgniteCache<Integer, Employer> employer1 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> employer1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("employer1")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER1")))
             .setBackups(1)
         );
 
-        IgniteCache<Integer, Employer> employer2 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> employer2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("employer2")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER2")))
             .setBackups(2)
         );
 
-        IgniteCache<Integer, Employer> employer3 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> employer3 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("employer3")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER3")))
@@ -231,39 +239,33 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
         awaitPartitionMapExchange(true, true, null);
 
-        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
-
-        List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
-            "SELECT * FROM employer1 " +
-                "UNION ALL " +
-                "SELECT * FROM employer2 " +
-                "UNION ALL " +
-                "SELECT * FROM employer3 ");
-
-        assertEquals(1, query.size());
+        List<List<?>> rows = sql("SELECT * FROM employer1 " +
+            "UNION ALL " +
+            "SELECT * FROM employer2 " +
+            "UNION ALL " +
+            "SELECT * FROM employer3 ");
 
-        List<List<?>> rows = query.get(0).getAll();
         assertEquals(6, rows.size());
     }
 
     /** */
     @Test
     public void union() throws Exception {
-        IgniteCache<Integer, Employer> employer1 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> employer1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("employer1")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER1")))
             .setBackups(1)
         );
 
-        IgniteCache<Integer, Employer> employer2 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> employer2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("employer2")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER2")))
             .setBackups(2)
         );
 
-        IgniteCache<Integer, Employer> employer3 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> employer3 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("employer3")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER3")))
@@ -296,14 +298,14 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     /** */
     private void populateTables() throws InterruptedException {
-        IgniteCache<Integer, Employer> orders = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> orders = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("orders")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("orders")))
             .setBackups(2)
         );
 
-        IgniteCache<Integer, Employer> account = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> account = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("account")
             .setSqlSchema("PUBLIC")
             .setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("account")))
@@ -405,7 +407,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         populateTables();
 
         List<List<?>> rows = sql(
-                "SELECT distinct(name) FROM Orders o WHERE name IN (" +
+            "SELECT distinct(name) FROM Orders o WHERE name IN (" +
                 "   SELECT name" +
                 "   FROM Account)");
 
@@ -453,7 +455,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         populateTables();
 
         List<List<?>> rows = sql(
-                "SELECT name FROM Orders o WHERE EXISTS (" +
+            "SELECT name FROM Orders o WHERE EXISTS (" +
                 "   SELECT 1" +
                 "   FROM Account a" +
                 "   WHERE o.name = a.name)");
@@ -468,7 +470,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         populateTables();
 
         List<List<?>> rows = sql(
-                "EXPLAIN PLAN FOR SELECT name FROM Orders o WHERE NOT EXISTS (" +
+            "EXPLAIN PLAN FOR SELECT name FROM Orders o WHERE NOT EXISTS (" +
                 "   SELECT 1" +
                 "   FROM Account a" +
                 "   WHERE o.name = a.name)");
@@ -484,7 +486,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
         assertEquals(3, rows.size());
 
         rows = sql(
-                "SELECT distinct(name) FROM Orders o WHERE NOT EXISTS (" +
+            "SELECT distinct(name) FROM Orders o WHERE NOT EXISTS (" +
                 "   SELECT name" +
                 "   FROM Account a" +
                 "   WHERE o.name = a.name)");
@@ -517,7 +519,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     /** */
     @Test
     public void aggregate() throws Exception {
-        IgniteCache<Integer, Employer> employer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+        IgniteCache<Integer, Employer> employer = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
             .setName("employer")
             .setSqlSchema("PUBLIC")
             .setIndexedTypes(Integer.class, Employer.class)
@@ -694,7 +696,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
     /** */
     @Test
     public void testInsertUpdateDeleteNonPrimitiveKey() throws Exception {
-        IgniteCache<Key, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Key, Developer>()
+        IgniteCache<Key, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Key, Developer>()
             .setName("developer")
             .setSqlSchema("PUBLIC")
             .setIndexedTypes(Key.class, Developer.class)
@@ -766,7 +768,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     /** for test purpose only. */
     public void testThroughput() {
-        IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
+        IgniteCache<Integer, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
             .setCacheMode(CacheMode.REPLICATED)
             .setName("developer")
             .setSqlSchema("PUBLIC")
@@ -785,7 +787,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
             developer.put(i, new Developer("Name" + i, prId));
         }
 
-        QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class);
+        QueryEngine engine = Commons.lookupComponent(client.context(), QueryEngine.class);
 
         // warmup
         for (int i = 0; i < numIterations; i++) {
@@ -802,13 +804,13 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
         // warmup
         for (int i = 0; i < numIterations; i++) {
-            List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
+            List<FieldsQueryCursor<List<?>>> query = client.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
             query.get(0).getAll();
         }
 
         start = System.currentTimeMillis();
         for (int i = 0; i < numIterations; i++) {
-            List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
+            List<FieldsQueryCursor<List<?>>> query = client.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
             query.get(0).getAll();
         }
         System.out.println("H2 duration = " + (System.currentTimeMillis() - start));
@@ -816,11 +818,25 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
 
     /** */
     private List<List<?>> sql(String sql) {
-        QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+        QueryEngine engineSrv = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+        assertTrue(client.configuration().isClientMode());
+
+        QueryEngine engineCli = Commons.lookupComponent(client.context(), QueryEngine.class);
 
-        List<FieldsQueryCursor<List<?>>> cursors = engine.query(null, "PUBLIC", sql);
+        List<FieldsQueryCursor<List<?>>> cursorsSrv = engineSrv.query(null, "PUBLIC", sql);
+
+        List<FieldsQueryCursor<List<?>>> cursorsCli = engineCli.query(null, "PUBLIC", sql);
+
+        List<List<?>> allSrv;
+
+        try (QueryCursor srvCursor = cursorsSrv.get(0); QueryCursor cliCursor = cursorsCli.get(0)) {
+            allSrv = srvCursor.getAll();
+
+            assertEquals(allSrv.size(), cliCursor.getAll().size());
+        }
 
-        return cursors.get(0).getAll();
+        return allSrv;
     }
 
     /** */