You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by tl...@apache.org on 2021/01/15 09:40:31 UTC
[ignite] branch sql-calcite updated: IGNITE-13915 Calcite
improvements. Fix fragment ID on mapping, extend tests coverage,
use both client and server for starting queries (#8629)
This is an automated email from the ASF dual-hosted git repository.
tledkov pushed a commit to branch sql-calcite
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/sql-calcite by this push:
new 7a09246 IGNITE-13915 Calcite improvements. Fix fragment ID on mapping, extend tests coverage, use both client and server for starting queries (#8629)
7a09246 is described below
commit 7a09246b4cdc80b568c4eec838dcd40492f39822
Author: zstan <st...@gmail.com>
AuthorDate: Fri Jan 15 12:40:09 2021 +0300
IGNITE-13915 Calcite improvements. Fix fragment ID on mapping, extend tests coverage, use both client and server for starting queries (#8629)
---
.../processors/query/calcite/exec/rel/Inbox.java | 5 +
.../processors/query/calcite/prepare/Fragment.java | 10 --
.../query/calcite/prepare/QueryTemplate.java | 2 +-
.../query/calcite/CalciteQueryProcessorTest.java | 160 +++++++++++----------
4 files changed, 94 insertions(+), 83 deletions(-)
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
index c261b79..7bdc0ac 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/exec/rel/Inbox.java
@@ -108,6 +108,9 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
* @param comp Optional comparator for merge exchange.
*/
public void init(ExecutionContext<Row> ctx, RelDataType rowType, Collection<UUID> srcNodeIds, @Nullable Comparator<Row> comp) {
+ assert context().fragmentId() == ctx.fragmentId() : "different fragments unsupported: previous=" + context().fragmentId() +
+ " current=" + ctx.fragmentId();
+
// It's important to set proper context here because
// the one, that is created on a first message
// received doesn't have all context variables in place.
@@ -187,6 +190,8 @@ public class Inbox<Row> extends AbstractNode<Row> implements Mailbox<Row>, Singl
/** */
private void doPush() {
try {
+ checkState();
+
push();
}
catch (Exception e) {
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
index 90a5a5f..9db3bf1 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/Fragment.java
@@ -67,16 +67,6 @@ public class Fragment {
this(id, root, remotes, null, null);
}
- /**
- * @param id Fragment id.
- * @param root Root node of the fragment.
- * @param remotes Remote sources of the fragment.
- * @param rootSer Root serialized representation.
- */
- public Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer) {
- this(id, root, remotes, rootSer, null);
- }
-
/** */
Fragment(long id, IgniteRel root, List<IgniteReceiver> remotes, @Nullable String rootSer, @Nullable FragmentMapping mapping) {
this.id = id;
diff --git a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
index 583385c..5689994 100644
--- a/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
+++ b/modules/calcite/src/main/java/org/apache/ignite/internal/processors/query/calcite/prepare/QueryTemplate.java
@@ -119,7 +119,7 @@ public class QueryTemplate {
sender = new IgniteSender(sender.getCluster(), sender.getTraitSet(),
sender.getInput(), sender.exchangeId(), newTargetId, sender.distribution());
- fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes(), fragment0.serialized());
+ fragment0 = new Fragment(fragment0.fragmentId(), sender, fragment0.remotes());
}
}
diff --git a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
index 3f7e01a..4fc05bb 100644
--- a/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
+++ b/modules/calcite/src/test/java/org/apache/ignite/internal/processors/query/calcite/CalciteQueryProcessorTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.query.calcite;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -29,6 +30,7 @@ import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.QueryEntity;
import org.apache.ignite.cache.affinity.AffinityKeyMapped;
import org.apache.ignite.cache.query.FieldsQueryCursor;
+import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.configuration.CacheConfiguration;
@@ -50,17 +52,17 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_EXPERIMENTAL_SQL_E
@WithSystemProperty(key = "calcite.debug", value = "false")
public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** */
- private static IgniteEx ignite;
+ private static IgniteEx client;
/** {@inheritDoc} */
@Override protected void beforeTestsStarted() throws Exception {
startGrids(5);
- ignite = startClientGrid();
+ client = startClientGrid();
}
/** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
+ @Override protected void afterTest() {
for (Ignite ign : G.allGrids()) {
for (String cacheName : ign.cacheNames())
ign.destroyCache(cacheName);
@@ -75,21 +77,23 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** */
@Test
public void testCountWithJoin() throws Exception {
- IgniteCache<Integer, RISK> RISK = ignite.getOrCreateCache(new CacheConfiguration<Integer, RISK>()
+ IgniteCache<Integer, RISK> RISK = client.getOrCreateCache(new CacheConfiguration<Integer, RISK>()
.setName("RISK")
.setSqlSchema("PUBLIC")
- .setQueryEntities(F.asList(new QueryEntity(Integer.class, RISK.class).setTableName("RISK")))
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, RISK.class).setTableName("RISK")
+ .setKeyFields(new HashSet<>(Arrays.asList("TRADEID", "TRADEVER")))))
.setBackups(1)
);
- IgniteCache<Integer, TRADE> TRADE = ignite.getOrCreateCache(new CacheConfiguration<Integer, TRADE>()
+ IgniteCache<Integer, TRADE> TRADE = client.getOrCreateCache(new CacheConfiguration<Integer, TRADE>()
.setName("TRADE")
.setSqlSchema("PUBLIC")
- .setQueryEntities(F.asList(new QueryEntity(Integer.class, TRADE.class).setTableName("TRADE")))
+ .setQueryEntities(F.asList(new QueryEntity(Integer.class, TRADE.class).setTableName("TRADE")
+ .setKeyFields(new HashSet<>(Arrays.asList("TRADEID", "TRADEVER")))))
.setBackups(1)
);
- IgniteCache<Integer, BATCH> BATCH = ignite.getOrCreateCache(new CacheConfiguration<Integer, BATCH>()
+ IgniteCache<Integer, BATCH> BATCH = client.getOrCreateCache(new CacheConfiguration<Integer, BATCH>()
.setName("BATCH")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, BATCH.class).setTableName("BATCH")))
@@ -115,47 +119,51 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
awaitPartitionMapExchange(true, true, null);
- QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class);
-
// TODO: https://issues.apache.org/jira/browse/IGNITE-13849
// we have a problem with serialization/deserialization of MergeJoin
- List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
- "SELECT /*+ DISABLE_RULE('MergeJoinConverter') */ count(*)" +
- " FROM RISK R," +
- " TRADE T," +
- " BATCH B " +
- "WHERE R.BATCHKEY = B.BATCHKEY " +
- "AND R.TRADEID = T.TRADEID " +
- "AND R.TRADEVER = T.TRADEVER " +
- "AND T.BOOK = 'BOOK' " +
- "AND B.IS = TRUE");
+ String sqlCalc = "SELECT /*+ DISABLE_RULE('MergeJoinConverter') */ count(*)" +
+ " FROM RISK R," +
+ " TRADE T," +
+ " BATCH B " +
+ "WHERE R.BATCHKEY = B.BATCHKEY " +
+ "AND R.TRADEID = T.TRADEID " +
+ "AND R.TRADEVER = T.TRADEVER " +
+ "AND T.BOOK = 'BOOK' " +
+ "AND B.LS = TRUE";
+
+ // loop for test execution.
+ for (int i = 0; i < 10; i++) {
+ List<List<?>> resLoc = sql(sqlCalc);
+ assertEquals(40L, resLoc.get(0).get(0));
+ }
- List<List<?>> res = query.get(0).getAll();
+ //calcite
+ List<List<?>> res1 = sql(sqlCalc);
- assertEquals(1, res.size());
- assertEquals(1, res.get(0).size());
- assertEquals(40L, res.get(0).get(0));
+ assertEquals(1, res1.size());
+ assertEquals(1, res1.get(0).size());
+ assertEquals(40L, res1.get(0).get(0));
}
/** */
public static class RISK {
/** */
@QuerySqlField
- public Integer BATCHKEY;
+ public Integer batchKey;
/** */
@QuerySqlField
- public Integer TRADEID;
+ public Integer tradeId;
/** */
@QuerySqlField
- public Integer TRADEVER;
+ public Integer tradeVer;
/** */
public RISK(Integer in) {
- BATCHKEY = in;
- TRADEID = in;
- TRADEVER = in;
+ batchKey = in;
+ tradeId = in;
+ tradeVer = in;
}
}
@@ -163,21 +171,21 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
public static class TRADE {
/** */
@QuerySqlField
- public Integer TRADEID;
+ public Integer tradeId;
/** */
@QuerySqlField
- public Integer TRADEVER;
+ public Integer tradeVer;
/** */
@QuerySqlField
- public String BOOK;
+ public String book;
/** */
public TRADE(Integer in) {
- TRADEID = in;
- TRADEVER = in;
- BOOK = (in & 1) != 0 ? "BOOK" : "";
+ tradeId = in;
+ tradeVer = in;
+ book = (in & 1) != 0 ? "BOOK" : "";
}
}
@@ -185,37 +193,37 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
public static class BATCH {
/** */
@QuerySqlField
- public Integer BATCHKEY;
+ public Integer batchKey;
/** */
@QuerySqlField
- public Boolean IS;
+ public Boolean ls;
/** */
public BATCH(Integer in) {
- BATCHKEY = in;
- IS = (in & 1) != 0;
+ batchKey = in;
+ ls = (in & 1) != 0;
}
}
/** */
@Test
public void unionAll() throws Exception {
- IgniteCache<Integer, Employer> employer1 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> employer1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("employer1")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER1")))
.setBackups(1)
);
- IgniteCache<Integer, Employer> employer2 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> employer2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("employer2")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER2")))
.setBackups(2)
);
- IgniteCache<Integer, Employer> employer3 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> employer3 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("employer3")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER3")))
@@ -231,39 +239,33 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
awaitPartitionMapExchange(true, true, null);
- QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
-
- List<FieldsQueryCursor<List<?>>> query = engine.query(null, "PUBLIC",
- "SELECT * FROM employer1 " +
- "UNION ALL " +
- "SELECT * FROM employer2 " +
- "UNION ALL " +
- "SELECT * FROM employer3 ");
-
- assertEquals(1, query.size());
+ List<List<?>> rows = sql("SELECT * FROM employer1 " +
+ "UNION ALL " +
+ "SELECT * FROM employer2 " +
+ "UNION ALL " +
+ "SELECT * FROM employer3 ");
- List<List<?>> rows = query.get(0).getAll();
assertEquals(6, rows.size());
}
/** */
@Test
public void union() throws Exception {
- IgniteCache<Integer, Employer> employer1 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> employer1 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("employer1")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER1")))
.setBackups(1)
);
- IgniteCache<Integer, Employer> employer2 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> employer2 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("employer2")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER2")))
.setBackups(2)
);
- IgniteCache<Integer, Employer> employer3 = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> employer3 = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("employer3")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("EMPLOYER3")))
@@ -296,14 +298,14 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** */
private void populateTables() throws InterruptedException {
- IgniteCache<Integer, Employer> orders = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> orders = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("orders")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("orders")))
.setBackups(2)
);
- IgniteCache<Integer, Employer> account = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> account = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("account")
.setSqlSchema("PUBLIC")
.setQueryEntities(F.asList(new QueryEntity(Integer.class, Employer.class).setTableName("account")))
@@ -405,7 +407,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
populateTables();
List<List<?>> rows = sql(
- "SELECT distinct(name) FROM Orders o WHERE name IN (" +
+ "SELECT distinct(name) FROM Orders o WHERE name IN (" +
" SELECT name" +
" FROM Account)");
@@ -453,7 +455,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
populateTables();
List<List<?>> rows = sql(
- "SELECT name FROM Orders o WHERE EXISTS (" +
+ "SELECT name FROM Orders o WHERE EXISTS (" +
" SELECT 1" +
" FROM Account a" +
" WHERE o.name = a.name)");
@@ -468,7 +470,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
populateTables();
List<List<?>> rows = sql(
- "EXPLAIN PLAN FOR SELECT name FROM Orders o WHERE NOT EXISTS (" +
+ "EXPLAIN PLAN FOR SELECT name FROM Orders o WHERE NOT EXISTS (" +
" SELECT 1" +
" FROM Account a" +
" WHERE o.name = a.name)");
@@ -484,7 +486,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
assertEquals(3, rows.size());
rows = sql(
- "SELECT distinct(name) FROM Orders o WHERE NOT EXISTS (" +
+ "SELECT distinct(name) FROM Orders o WHERE NOT EXISTS (" +
" SELECT name" +
" FROM Account a" +
" WHERE o.name = a.name)");
@@ -517,7 +519,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** */
@Test
public void aggregate() throws Exception {
- IgniteCache<Integer, Employer> employer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
+ IgniteCache<Integer, Employer> employer = client.getOrCreateCache(new CacheConfiguration<Integer, Employer>()
.setName("employer")
.setSqlSchema("PUBLIC")
.setIndexedTypes(Integer.class, Employer.class)
@@ -694,7 +696,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** */
@Test
public void testInsertUpdateDeleteNonPrimitiveKey() throws Exception {
- IgniteCache<Key, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Key, Developer>()
+ IgniteCache<Key, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Key, Developer>()
.setName("developer")
.setSqlSchema("PUBLIC")
.setIndexedTypes(Key.class, Developer.class)
@@ -766,7 +768,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** for test purpose only. */
public void testThroughput() {
- IgniteCache<Integer, Developer> developer = ignite.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
+ IgniteCache<Integer, Developer> developer = client.getOrCreateCache(new CacheConfiguration<Integer, Developer>()
.setCacheMode(CacheMode.REPLICATED)
.setName("developer")
.setSqlSchema("PUBLIC")
@@ -785,7 +787,7 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
developer.put(i, new Developer("Name" + i, prId));
}
- QueryEngine engine = Commons.lookupComponent(ignite.context(), QueryEngine.class);
+ QueryEngine engine = Commons.lookupComponent(client.context(), QueryEngine.class);
// warmup
for (int i = 0; i < numIterations; i++) {
@@ -802,13 +804,13 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
// warmup
for (int i = 0; i < numIterations; i++) {
- List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
+ List<FieldsQueryCursor<List<?>>> query = client.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
query.get(0).getAll();
}
start = System.currentTimeMillis();
for (int i = 0; i < numIterations; i++) {
- List<FieldsQueryCursor<List<?>>> query = ignite.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
+ List<FieldsQueryCursor<List<?>>> query = client.context().query().querySqlFields(new SqlFieldsQuery("select * from DEVELOPER").setSchema("PUBLIC"), false, false);
query.get(0).getAll();
}
System.out.println("H2 duration = " + (System.currentTimeMillis() - start));
@@ -816,11 +818,25 @@ public class CalciteQueryProcessorTest extends GridCommonAbstractTest {
/** */
private List<List<?>> sql(String sql) {
- QueryEngine engine = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+ QueryEngine engineSrv = Commons.lookupComponent(grid(1).context(), QueryEngine.class);
+
+ assertTrue(client.configuration().isClientMode());
+
+ QueryEngine engineCli = Commons.lookupComponent(client.context(), QueryEngine.class);
- List<FieldsQueryCursor<List<?>>> cursors = engine.query(null, "PUBLIC", sql);
+ List<FieldsQueryCursor<List<?>>> cursorsSrv = engineSrv.query(null, "PUBLIC", sql);
+
+ List<FieldsQueryCursor<List<?>>> cursorsCli = engineCli.query(null, "PUBLIC", sql);
+
+ List<List<?>> allSrv;
+
+ try (QueryCursor srvCursor = cursorsSrv.get(0); QueryCursor cliCursor = cursorsCli.get(0)) {
+ allSrv = srvCursor.getAll();
+
+ assertEquals(allSrv.size(), cliCursor.getAll().size());
+ }
- return cursors.get(0).getAll();
+ return allSrv;
}
/** */